# AUTOGENERATED FILE! PLEASE DON'T EDIT
"""
.. module:: k1lib
"""
from typing import Callable, Iterator, Tuple, Union, Dict, Any, List
from k1lib import isNumeric; import k1lib, contextlib
import random, torch, math, sys, io, os, numpy as np
__all__ = ["Object", "Range", "Domain", "AutoIncrement", "Wrapper", "Every",
"RunOnce", "MaxDepth", "MovingAvg", "Absorber",
"Settings", "settings", "_settings"]
[docs]class Object:
"""Convenience class that acts like :class:`~collections.defaultdict`. You
can use it like a normal object::
a = k1lib.Object()
a.b = 3
print(a.b) # outputs "3"
``__repr__()`` output is pretty nice too:
.. code-block:: text
<class '__main__.Object'>, with attrs:
- b
You can instantiate it from a dict::
a = k1lib.Object.fromDict({"b": 3, "c": 4})
print(a.c) # outputs "4"
And you can specify a default value, just like defaultdict::
a = k1lib.Object().withAutoDeclare(lambda: [])
a.texts.extend(["factorio", "world of warcraft"])
print(a.texts[0]) # outputs "factorio"
.. warning::
Default values only work with variables that don't start with an
underscore "_".
Treating it like defaultdict is okay too::
a = k1lib.Object().withAutoDeclare(lambda: [])
a["movies"].append("dune")
print(a.movies[0]) # outputs "dune" """
def __init__(self): self._defaultValueGenerator = None; self.repr = None
[docs] @staticmethod
def fromDict(_dict:Dict[str, Any]):
"""Creates an object with attributes from a dictionary"""
answer = Object(); answer.__dict__.update(_dict); return answer
@property
def state(self) -> dict:
"""Essentially ``__dict__``, but only outputs the fields you
defined. If your framework intentionally set some attributes, those
will be reported too, so beware"""
answer = dict(self.__dict__); del answer["_defaultValueGenerator"]
del answer["repr"]; return answer
[docs] def withAutoDeclare(self, defaultValueGenerator):
"""Sets this Object up so that if a field doesn't
exist, it will automatically create it with a
default value."""
self._defaultValueGenerator = defaultValueGenerator; return self
def __getitem__(self, idx): return getattr(self, idx)
def __setitem__(self, idx, value): setattr(self, idx, value)
def __iter__(self): yield from self.state.values()
def __contains__(self, item:str): return item in self.__dict__
def __getattr__(self, attr):
if attr.startswith("_"): raise AttributeError()
if attr == "getdoc": raise AttributeError("This param is used internally in module `IPython.core.oinspect`, so you kinda have to set it specifically yourself instead of relying on auto declare")
if self._defaultValueGenerator != None:
self.__dict__[attr] = self._defaultValueGenerator()
return self.__dict__[attr]
raise AttributeError
def __delitem__(self, key): del self.__dict__[key]
[docs] def withRepr(self, _repr:str):
"""Specify output of ``__repr__()``. Legacy code. You can just
monkey patch it instead."""
self.repr = _repr; return self
def __repr__(self):
_dict = "\n".join([f"- {k}" for k in self.state.keys()])
return self.repr or f"{type(self)}, with attrs:\n{_dict}"
ninf = float("-inf"); inf = float("inf")
[docs]class Range:
"""A range of numbers. It's just 2 numbers really: start and stop
This is essentially a convenience class to provide a nice, clean
abstraction and to eliminate errors. You can transform values::
Range(10, 20).toUnit(13) # returns 0.3
Range(10, 20).fromUnit(0.3) # returns 13
Range(10, 20).toRange(Range(20, 10), 13) # returns 17
You can also do random math operations on it::
(Range(10, 20) * 2 + 3) == Range(23, 43) # returns True
Range(10, 20) == ~Range(20, 10) # returns True"""
[docs] def __init__(self, start=0, stop=None):
"""Creates a new Range.
There are different ``__init__`` functions for many situations:
- Range(2, 11.1): create range [2, 11.1]
- Range(15.2): creates range [0, 15.2]
- Range(Range(2, 3)): create range [2, 3]. This serves as sort of a catch-all
- Range(slice(2, 5, 2)): creates range [2, 5]. Can also be a :class:`range`
- Range(slice(2, -1), 10): creates range [2, 9]
- Range([1, 2, 7, 5]): creates range [1, 5]. Can also be a tuple
"""
if (isNumeric(start) and isNumeric(stop)):
self.start, self.stop = start, stop
elif isNumeric(start) and stop == None:
self.start, self.stop = 0, start
elif stop == None and isinstance(start, (range, slice, Range)):
self.start, self.stop = start.start, start.stop
elif isNumeric(stop) and isinstance(start, slice):
r = range(stop)[start]; self.start, self.stop = r.start, r.stop
elif isinstance(start, (list, tuple)):
self.start, self.stop = start[0], start[-1]
else: raise AttributeError(f"Don't understand {start} and {stop}")
self.delta = self.stop - self.start
[docs] def __getitem__(self, index):
"""0 for start, 1 for stop
You can also pass in a :class:`slice` object, in which case, a range subset
will be returned. Code kinda looks like this::
range(start, stop)[index]"""
if index == 0: return self.start
if index == 1: return self.stop
if type(index) == slice:
return Range(range(self.start, self.stop)[index])
raise Exception(f"Can't get index {index} of range [{self.start}, {self.stop}]")
[docs] def fixOrder(self) -> "Range":
"""If start greater than stop, switch the 2, else do nothing"""
if self.start > self.stop:
self.start, self.stop = self.stop, self.start
return self
def _common(self, x, f:Callable[[float], float]):
if isNumeric(x): return f(x)
if isinstance(x, (list, tuple)):
return [self._common(elem, f) for elem in x]
if isinstance(x, (range, slice, Range)):
return Range(self._common(x.start if x.start != None else 0, f), self._common(x.stop if x.stop != None else 1, f))
raise AttributeError(f"Doesn't understand {x}")
def __iter__(self): yield self.start; yield self.stop
[docs] def intIter(self, step:int=1) -> Iterator[int]:
"""Returns integers within this Range"""
return range(int(self.start), int(self.stop), step)
[docs] def toUnit(self, x):
"""Converts x from current range to [0, 1] range. Example::
r = Range(2, 10)
r.toUnit(5) # will return 0.375, as that is (5-2)/(10-2)
You can actually pass in a lot in place of x::
r = Range(0, 10)
r.toUnit([5, 3, 6]) # will be [0.5, 0.3, 0.6]. Can also be a tuple
r.toUnit(slice(5, 6)) # will be slice(0.5, 0.6). Can also be a range, or Range
.. note::
In the last case, if ``start`` is None, it gets defaulted to 0, and
if ``end`` is None, it gets defaulted to 1
"""
def f(x):
if self.delta == 0: return float("nan")
return (x - self.start) / self.delta
return self._common(x, lambda x: float("nan") if self.delta == 0 else (x - self.start) / self.delta)
[docs] def fromUnit(self, x):
"""Converts x from [0, 1] range to this range. Example::
r = Range(0, 10)
r.fromUnit(0.3) # will return 3
x can be a lot of things, see :meth:`toUnit` for more"""
return self._common(x, lambda x: x * self.delta + self.start)
[docs] def toRange(self, _range:"Range", x):
"""Converts x from current range to another range. Example::
r = Range(0, 10)
r.toRange(Range(0, 100), 6) # will return 60
x can be a lot of things, see :meth:`toUnit` for more."""
return self._common(x, lambda x: Range(_range).fromUnit(self.toUnit(x)))
[docs] def fromRange(self, _range:"Range", x):
"""Reverse of :meth:`toRange`, effectively."""
return _range.toRange(self, x)
@property
def range_(self):
"""Returns a :class:`range` object with start and stop values
rounded off"""
return range(math.floor(self.start+0.001), math.floor(self.stop+0.001))
@property
def slice_(self):
"""Returns a :class:`slice` object with start and stop values
rounded off"""
return slice(math.floor(self.start+0.001), math.floor(self.stop+0.001))
[docs] @staticmethod
def proportionalSlice(r1, r2, r1Slice:slice) -> Tuple["Range", "Range"]:
"""Slices r1 and r2 proportionally. Best to explain using an
example. Let's say you have 2 arrays created from a time-dependent
procedure like this::
a = []; b = []
for t in range(100):
if t % 3 == 0: a.append(t)
if t % 5 == 0: b.append(1 - t)
len(a), len(b) # returns (34, 20)
a and b are of different lengths, but you want to plot both from 30%
mark to 50% mark (for a, it's elements 10 -> 17, for b it's 6 -> 10),
as they are time-dependent. As you can probably tell, to get the indicies
10, 17, 6, 10 is messy. So, you can do something like this instead::
r1, r2 = Range.proportionalSlice(Range(len(a)), Range(len(b)), slice(10, 17))
This will return the Ranges [10, 17] and [5.88, 10]
Then, you can plot both of them side by side like this::
fig, axes = plt.subplots(ncols=2)
axes[0].plot(r1.range_, a[r1.slice_])
axes[1].plot(r2.range_, a[r2.slice_])
"""
r1, r2 = Range(r1), Range(r2)
ar1 = r1[r1Slice]; ar2 = r1.toRange(r2, ar1)
return ar1, ar2
[docs] def bound(self, rs:Union[range, slice]) -> Union[range, slice]:
"""If input range|slice's stop and start is missing, then use this
range's start and stop instead."""
start = rs.start or self.start
stop = rs.stop or self.stop
return type(rs)(start, stop)
[docs] def copy(self): return Range(self.start, self.stop)
def __str__(self): return f"[{self.start}, {self.stop}]"
def __eq__(self, _range):
_range = Range(_range)
return (_range.start == self.start or abs(_range.start - self.start) < 1e-9) and\
(_range.stop == self.stop or abs(_range.stop - self.stop) < 1e-9)
def __contains__(self, x:float): return x >= self.start and x < self.stop
def __neg__(self): return Range(-self.start, -self.stop)
[docs] def __invert__(self): return Range(self.stop, self.start)
def __add__(self, num): return Range(self.start + num, self.stop + num)
def __radd__(self, num): return self + num
def __mul__(self, num): return Range(self.start * num, self.stop * num)
def __rmul__(self, num): return self * num
def __truediv__(self, num): return num * (1/num)
def __rtruediv__(self, num): raise "Doesn't make sense to do this!"
def __round__(self): return Range(round(self.start), round(self.stop))
def __ceil__(self): return Range(math.ceil(self.start), math.ceil(self.stop))
def __floor__(self): return Range(math.floor(self.start), math.floor(self.stop))
def __repr__(self):
return f"""A range of numbers: [{self.start}, {self.stop}]. Can do:
- r.toUnit(x): will convert x from range [{self.start}, {self.stop}] to [0, 1]
- r.fromUnit(x): will convert x from range [0, 1] to range [{self.start}, {self.stop}]
- r.toRange([a, b], x): will convert x from range [{self.start}, {self.stop}] to range [a, b]
- r[0], r[1], r.start, r.stop: get start and stop values of range
Note: for conversion methods, you can pass in"""
def yieldLowest(r1s:Iterator[Range], r2s:Iterator[Range]):
"""Given 2 :class:`Range` generators with lengths a and b, yield every
object (a + b) so that :class:`Range`s with smaller start point gets yielded
first. Assumes that each generator:
- Does not intersect with itself
- Is sorted by start point already
.. warning::
This method will sometimes yield the same objects given by the Iterators.
Make sure you copy each :class:`Range` if your use case requires"""
r1s = iter(r1s); r2s = iter(r2s)
r1 = next(r1s, None)
if r1 is None: yield from r2s; return
r2 = next(r2s, None)
if r2 is None: yield r1; yield from r1s; return
while True:
while r1.start <= r2.start:
yield r1
r1 = next(r1s, None)
if r1 is None: yield r2; yield from r2s; return
while r2.start <= r1.start:
yield r2
r2 = next(r2s, None)
if r2 is None: yield r1; yield from r1s; return
def join(r1s:Iterator[Range], r2s:Iterator[Range]):
"""Joins 2 :class:`Range` generators, so that overlaps gets merged
together.
.. warning::
This method will sometimes yield the same objects given by the Iterators.
Make sure you copy each :class:`Range` if your use case requires"""
it = yieldLowest(r1s, r2s); r = next(it, None)
if r is None: return
while True:
nr = next(it, None)
if nr is None: yield r; return
if r.stop >= nr.start:
r = r.copy(); r.stop = max(r.stop, nr.stop)
else: yield r; r = nr
def neg(rs:List[Range]):
"""Returns R - rs, where R is the set of real numbers."""
rs = iter(rs); r = next(rs, None)
if r is None: yield Range(ninf, inf); return
if ninf < r.start: yield Range(ninf, r.start) # check -inf case
while True:
start = r.stop
r = next(rs, None)
if r is None:
if start < inf: yield Range(start, inf)
return
yield Range(start, r.start)
[docs]class Domain:
[docs] def __init__(self, *ranges, dontCheck:bool=False):
"""Creates a new domain.
:param ranges: each element is a :class:`Range`, although any format will be fine as this selects for that
:param dontCheck: don't sanitize inputs, intended to boost perf internally only
A domain is just an array of :class:`Range` that represents what intervals on
the real number line is chosen. Some examples::
inf = float("inf") # shorthand for infinity
Domain([5, 7.5], [2, 3]) # represents "[2, 3) U [5, 7.5)"
Domain([2, 3.2], [3, 8]) # represents "[2, 8)" as overlaps are merged
-Domain([2, 3]) # represents "(-inf, 2) U [3, inf)", so essentially R - d, with R being the set of real numbers
-Domain([-inf, 3]) # represents "[3, inf)"
Domain.fromInts(2, 3, 6) # represents "[2, 4) U [6, 7)"
You can also do arithmetic on them, and check "in" oeprator::
Domain([2, 3]) + Domain([4, 5]) # represents "[2, 3) U [4, 5)"
Domain([2, 3]) + Domain([2.9, 5]) # represents "[2, 5)", also merges overlaps
3 in Domain([2, 3]) # returns False
2 in Domain([2, 3]) # returns True"""
if dontCheck: self.ranges = list(ranges); return
# convert all to Range type, fix its order, and sort based on .start
ranges = [(r if isinstance(r, Range) else Range(r)).fixOrder() for r in ranges]
ranges = sorted(ranges, key=lambda r: r.start)
# merges overlapping segments
self.ranges = list(join(ranges, []))
[docs] @staticmethod
def fromInts(*ints:List[int]):
"""Returns a new :class:`Domain` which has ranges [i, i+1] for each
int given."""
return Domain(*(Range(i, i+1) for i in ints))
[docs] def copy(self): return Domain(*(r.copy() for r in self.ranges))
[docs] def intIter(self, step:int=1, start:int=0):
"""Yields ints in all ranges of this domain. If first range's domain
is :math:`(-\inf, a)`, then starts at the specified integer"""
if len(self.ranges) == 0: return
for r in self.ranges:
x = int(start) if r.start == -inf else int(r.start)
while x < r.stop: yield x; x += step
def __neg__(self): return Domain(*neg(self.ranges), dontCheck=True)
def __add__(self, domain): return Domain(*(r.copy() for r in join(self.ranges, domain.ranges)), dontCheck=True)
def __sub__(self, domain): return self + (-domain)
def __eq__(self, domain): return self.ranges == domain.ranges
def __str__(self): return f"Domain: {', '.join(r for r in self.ranges)}"
def __contains__(self, x): return any(x in r for r in self.ranges)
def __repr__(self):
rs = '\n'.join(f"- {r}" for r in self.ranges)
return f"""Domain:\n{rs}\n\nCan:
- 3 in d: check whether a number is in this domain or not
- d1 + d2: joins 2 domain
- -d: excludes the domain from R
- d1 - d2: same as d1 + (-d2)"""
[docs]class AutoIncrement:
[docs] def __init__(self, initialValue:int=-1, n:int=float("inf"), prefix:str=None):
"""Creates a new AutoIncrement object. Every time the object is called
it gets incremented by 1 automatically. Example::
a = k1lib.AutoIncrement()
a() # returns 0
a() # returns 1
a() # returns 2
a.value # returns 2
a.value # returns 2
a() # returns 3
a = AutoIncrement(n=3, prefix="cluster_")
a() # returns "cluster_0"
a() # returns "cluster_1"
a() # returns "cluster_2"
a() # returns "cluster_0"
:param n: if specified, then will wrap around to 0 when hit this number
:param prefix: if specified, will yield strings with specified prefix"""
self.value = initialValue; self.n = n; self.prefix = prefix
[docs] @staticmethod
def random() -> "AutoIncrement":
"""Creates a new AutoIncrement object that has a random integer initial value"""
return AutoIncrement(random.randint(0, 1e9))
@property
def value(self):
"""Get the value as-is, without auto incrementing it"""
if self.prefix is None: return self._value
return f"{self.prefix}{self._value}"
@value.setter
def value(self, value): self._value = value
[docs] def __call__(self):
"""Increments internal counter, and return it."""
self._value += 1
if self._value >= self.n: self._value = 0
return self.value
[docs]class Wrapper:
value:Any
"""Internal value of this :class:`Wrapper`"""
[docs] def __init__(self, value):
"""Creates a wrapper for some value and get it by calling it.
Example::
a = k1lib.Wrapper(list(range(int(1e7))))
# returns [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
a()[:10]
This exists just so that Jupyter Lab's contextual help won't automatically
display the (possibly humongous) value. Could be useful if you want to pass a
value by reference everywhere like this::
o = k1lib.Wrapper(None)
def f(obj):
obj.value = 3
f(o)
o() # returns 3"""
self.value = value
def __call__(self): return self.value
[docs]class Every:
[docs] def __init__(self, n):
"""Returns True every interval.
Example::
e = k1lib.Every(4)
e() # returns True
e() # returns False
e() # returns False
e() # returns False
e() # returns True"""
self.n = n; self.i = -1
[docs] def __call__(self) -> bool:
"""Returns True or False based on internal count."""
self.i += 1; return self.value
@property
def value(self) -> bool:
if self.i % self.n: return False
else: return True
[docs]class RunOnce:
[docs] def __init__(self):
"""Returns False first time only.
Example::
r = k1lib.RunOnce()
r.done() # returns False
r.done() # returns True
r.done() # returns True
r.revert()
r.done() # returns False
r.done() # returns True
r.done() # returns True
May be useful in situations like::
class A:
def __init__(self):
self.ro = k1lib.RunOnce()
def f(self, x):
if self.ro.done(): return 3 + x
return 5 + x
a = A()
a.f(4) # returns 9
a.f(4) # returns 7"""
self.value = False
[docs] def done(self):
"""Whether this has been called once before."""
v = self.value
self.value = True
return v
def __call__(self):
"""Alias of :meth:`done`."""
return self.done()
[docs] def revert(self):
self.value = False
[docs]class MaxDepth:
[docs] def __init__(self, maxDepth:int, depth:int=0):
"""Convenience utility to check for graph max depth.
Example::
def f(d):
print(d.depth)
if d: f(d.enter())
# prints "0\\n1\\n2\\n3"
f(k1lib.MaxDepth(3))
Of course, this might look unpleasant to the end user, so this is more
likely for internal tools."""
self.maxDepth = maxDepth; self.depth = depth
[docs] def enter(self) -> "MaxDepth":
return MaxDepth(self.maxDepth, self.depth + 1)
def __bool__(self):
return self.depth < self.maxDepth
def __call__(self):
"""Alias of :meth:`__bool__`."""
return bool(self)
[docs]class MovingAvg:
[docs] def __init__(self, initV:float=0, alpha=0.9, debias=False):
"""Smoothes out sequential data using momentum.
Example::
a = k1lib.MovingAvg(5)
a(3).value # returns 4.8, because 0.9*5 + 0.1*3 = 4.8
a(3).value # returns 4.62
Difference between normal and debias modes::
x = torch.linspace(0, 10, 100); y = torch.cos(x) | op().item().all() | deref()
plt.plot(x, y);
a = k1lib.MovingAvg(debias=False); plt.plot(x, y | apply(lambda y: a(y).value) | deref())
a = k1lib.MovingAvg(debias=True); plt.plot(x, y | apply(lambda y: a(y).value) | deref())
plt.legend(["Signal", "Normal", "Debiased"])
.. image:: images/movingAvg.png
As you can see, normal mode still has the influence of the initial value at
0 and can't rise up fast, whereas the debias mode will ignore the initial
value and immediately snaps to the first saved value.
:param initV: initial value
:param alpha: number in [0, 1]. Basically how much to keep old value?
:param debias: whether to debias the initial value"""
self.value = initV; self.alpha = alpha; self.debias = debias
self.m = self.value; self.t = 0
def __call__(self, value):
"""Updates the average with a new value"""
self.m = self.m * self.alpha + value * (1 - self.alpha)
if self.debias:
self.t += 1
self.value = self.m / (1 - self.alpha**self.t)
else: self.value = self.m
return self
def __add__(self, o): return self.value + o
def __radd__(self, o): return o + self.value
def __sub__(self, o): return self.value - o
def __rsub__(self, o): return o - self.value
def __mul__(self, o): return self.value * o
def __rmul__(self, o): return o * self.value
def __truediv__(self, o): return self.value / o
def __rtruediv__(self, o): return o / self.value
def __repr__(self):
return f"Moving average: {self.value}, alpha: {self.alpha}"
sen = "_ab_sentinel"
[docs]class Absorber:
"""Creates an object that absorbes every operation done on it. Could be
useful in some scenarios::
ab = k1lib.Absorber()
# absorbs all operations done on the object
abs(ab[::3].sum(dim=1))
t = torch.randn(5, 3, 3)
# returns transformed tensor of size [2, 3]
ab.ab_operate(t)
Another::
ab = Absorber()
ab[2] = -50
# returns [0, 1, -50, 3, 4]
ab.ab_operate(list(range(5)))
Because this object absorbs every operation done on it, you have to be gentle with
it, as any unplanned disturbances might throw your code off. Best to create a new
one on the fly, and pass them immediately to functions, because if you're in a
notebook environment like Jupyter, it might poke at variables.
For extended code example that utilizes this, check over :class:`k1lib.cli.modifier.op`
source code."""
[docs] def __init__(self, initDict:dict=dict()):
"""Creates a new Absorber.
:param initDict: initial variables to set, as setattr operation is normally absorbed"""
self._ab_sentinel = True
self._ab_steps = []
for k, v in initDict.items(): setattr(self, k, v)
self._ab_sentinel = False
[docs] def ab_operate(self, x):
"""Special method to actually operate on an object and get the result. Not
absorbed. Example::
# returns 6
(op() * 2).ab_operate(3)"""
for desc, step in self._ab_steps: x = step(x)
return x
[docs] def ab_fastF(self):
"""Returns a function that operates on the input (just like :meth:`ab_operate`),
but much faster, suitable for high performance tasks. Example::
f = (k1lib.Absorber() * 2).ab_fastF()
# returns 6
f(3)"""
s = self._ab_steps; l = len(s)
if l == 0: return lambda x: x
if l == 1: return s[0][1]
if l == 2:
a, b = s[0][1], s[1][1]
return lambda x: b(a(x))
if l == 3:
a, b, c = s[0][1], s[1][1], s[2][1]
return lambda x: c(b(a(x)))
if l == 4:
a, b, c, d = s[0][1], s[1][1], s[2][1], s[3][1]
return lambda x: d(c(b(a(x))))
if l == 5:
a, b, c, d, e = s[0][1], s[1][1], s[2][1], s[3][1], s[4][1]
return lambda x: e(d(c(b(a(x)))))
return self.ab_operate
def __getattr__(self, idx):
if isinstance(idx, str) and idx.startswith("_"): raise AttributeError()
self._ab_steps.append([["__getattr__", idx], lambda x: getattr(x, idx)]); return self
def __setattr__(self, k, v):
"""Only allows legit variable setting when '_ab_sentinel' is True. Absorbs
operations if it's False."""
if k == sen: self.__dict__[k] = v
else:
if self.__dict__[sen]: self.__dict__[k] = v
else:
def f(x): setattr(x, k, v); return x
self._ab_steps.append([["__setattr__", [k, v]], f])
return self
def __getitem__(self, idx):
self._ab_steps.append([["__getitem__", idx], lambda x: x[idx]]); return self
def __setitem__(self, k, v):
def f(x): x[k] = v; return x
self._ab_steps.append([["__setitem__", [k, v]], f]); return self
def __call__(self, *args, **kwargs):
self._ab_steps.append([["__call__", [args, kwargs]], lambda x: x(*args, **kwargs)]); return self
def __len__(self): self._ab_steps.append([["__len__" ], lambda x: len(x)]); return self
def __add__(self, o): self._ab_steps.append([["__add__", o], lambda x: x+o ]); return self
def __radd__(self, o): self._ab_steps.append([["__radd__", o], lambda x: o+x ]); return self
def __sub__(self, o): self._ab_steps.append([["__sub__", o], lambda x: x-o ]); return self
def __rsub__(self, o): self._ab_steps.append([["__rsub__", o], lambda x: o-x ]); return self
def __mul__(self, o): self._ab_steps.append([["__mul__", o], lambda x: x*o ]); return self
def __rmul__(self, o): self._ab_steps.append([["__rmul__", o], lambda x: o*x ]); return self
def __matmul__(self, o): self._ab_steps.append([["__matmul__", o], lambda x: x@o ]); return self
def __rmatmul__(self, o): self._ab_steps.append([["__rmatmul__", o], lambda x: o@x ]); return self
def __truediv__(self, o): self._ab_steps.append([["__truediv__", o], lambda x: x/o ]); return self
def __rtruediv__(self, o): self._ab_steps.append([["__rtruediv__", o], lambda x: o/x ]); return self
def __floordiv__(self, o): self._ab_steps.append([["__floordiv__", o], lambda x: x//o]); return self
def __rfloordiv__(self, o): self._ab_steps.append([["__rfloordiv__", o], lambda x: o//x]); return self
def __mod__(self, o): self._ab_steps.append([["__mod__", o], lambda x: x%o ]); return self
def __rmod__(self, o): self._ab_steps.append([["__rmod__", o], lambda x: o%x ]); return self
def __pow__(self, o): self._ab_steps.append([["__pow__", o], lambda x: x**o]); return self
def __rpow__(self, o): self._ab_steps.append([["__rpow__", o], lambda x: o**x]); return self
def __lshift__(self, o): self._ab_steps.append([["__lshift__", o], lambda x: x<<o]); return self
def __rlshift__(self, o): self._ab_steps.append([["__rlshift__", o], lambda x: o<<x]); return self
def __rshift__(self, o): self._ab_steps.append([["__rshift__", o], lambda x: x>>o]); return self
def __rrshift__(self, o): self._ab_steps.append([["__rrshift__", o], lambda x: o>>x]); return self
def __and__(self, o): self._ab_steps.append([["__and__", o], lambda x: x&o ]); return self
def __rand__(self, o): self._ab_steps.append([["__rand__", o], lambda x: o&x ]); return self
def __xor__(self, o): self._ab_steps.append([["__xor__", o], lambda x: x^o ]); return self
def __rxor__(self, o): self._ab_steps.append([["__rxor__", o], lambda x: o^x ]); return self
def __or__(self, o): self._ab_steps.append([["__or__", o], lambda x: x|o ]); return self
[docs] def __ror__(self, o): self._ab_steps.append([["__ror__", o], lambda x: o|x ]); return self
def __lt__(self, o): self._ab_steps.append([["__lt__", o], lambda x: x<o ]); return self
def __le__(self, o): self._ab_steps.append([["__le__", o], lambda x: x<=o]); return self
def __eq__(self, o): self._ab_steps.append([["__eq__", o], lambda x: x==o]); return self
def __ne__(self, o): self._ab_steps.append([["__ne__", o], lambda x: x!=o]); return self
def __gt__(self, o): self._ab_steps.append([["__gt__", o], lambda x: x>o ]); return self
def __ge__(self, o): self._ab_steps.append([["__ge__", o], lambda x: x>=o]); return self
def __neg__(self): self._ab_steps.append([["__neg__"], lambda x: -x ]); return self
def __pos__(self): self._ab_steps.append([["__pos__"], lambda x: +x ]); return self
def __abs__(self): self._ab_steps.append([["__abs__"], lambda x: abs(x) ]); return self
[docs] def __invert__(self): self._ab_steps.append([["__invert__"], lambda x: ~x ]); return self
[docs] def ab_int(self):
"""Replacement for ``int(ab)``, as that requires returning an actual :class:`int`."""
self._ab_steps.append([["__int__"], lambda x: int(x) ]); return self
def __int__(self): return self.int()
[docs] def ab_float(self):
"""Replacement for ``float(ab)``, as that requires returning an actual :class:`float`."""
self._ab_steps.append([["__float__"], lambda x: float(x)]); return self
def __float__(self): return self.float()
[docs] def ab_str(self):
"""Replacement for ``str(ab)``, as that requires returning an actual :class:`str`."""
self._ab_steps.append([["__str__"], lambda x: str(x) ]); return self
[docs] def ab_len(self):
"""Replacement for ``len(ab)``, as that requires returning an actual :class:`int`."""
self._ab_steps.append([["__len__"], lambda x: len(x) ]); return self
[docs] def ab_contains(self, key):
"""Replacement for ``key in ab``, as that requires returning an actual :class:`int`."""
self._ab_steps.append([["__contains__", key], lambda x: key in x]); return self
sep = "\u200b" # weird separator, guaranteed (mostly) to not appear anywhere in the
# settings, so that I can pretty print it
[docs]class Settings:
[docs] def __init__(self, **kwargs):
"""Creates a new settings object. Basically fancy version of :class:`dict`.
Example::
s = k1lib.Settings(a=3, b="42")
s.c = k1lib.Settings(d=8)
s.a # returns 3
s.b # returns "42"
s.c.d # returns 8
print(s) # prints nested settings nicely"""
self._setattr_sentinel = True
for k, v in kwargs.items(): setattr(self, k, v)
self._docs = dict(); self._cbs = dict()
self._setattr_sentinel = False
[docs] @contextlib.contextmanager
def context(self, **kwargs):
"""Context manager to temporarily modify some settings. Applies
to all sub-settings. Example::
s = k1lib.Settings(a=3, b="42", c=k1lib.Settings(d=8))
with s.context(a=4):
s.c.d = 20
s.a # returns 4
s.c.d # returns 20
s.a # returns 3
s.c.d # returns 8"""
oldValues = dict(self.__dict__); err = None
for k in kwargs.keys():
if k not in oldValues:
raise RuntimeError(f"'{k}' settings not found!")
try:
with contextlib.ExitStack() as stack:
for _, sub in self._subSettings():
stack.enter_context(sub.context())
for k, v in kwargs.items(): setattr(self, k, v)
yield
finally:
for k, v in oldValues.items(): setattr(self, k, v)
[docs] def add(self, k:str, v:Any, docs:str="", cb:Callable[["Settings", Any], None]=None) -> "Settings":
"""Long way to add a variable. Advantage of this is that you can slip in extra
documentation for the variable. Example::
s = k1lib.Settings()
s.add("a", 3, "some docs")
print(s) # displays the extra docs
:param cb: callback that takes in (settings, new value) if any property changes"""
setattr(self, k, v); self._docs[k] = docs
self._cbs[k] = cb; return self
def _docsOf(self, k:str):
return f"{self._docs[k]}" if k in self._docs else ""
def _subSettings(self) -> List[Tuple[str, "Settings"]]:
return [(k, v) for k, v in self.__dict__.items() if isinstance(v, Settings) and not k.startswith("_")]
def _simpleSettings(self) -> List[Tuple[str, Any]]:
return [(k, v) for k, v in self.__dict__.items() if not isinstance(v, Settings) and not k.startswith("_")]
def __setattr__(self, k, v):
self.__dict__[k] = v
if k != "_setattr_sentinel" and not self._setattr_sentinel:
if k in self._cbs and self._cbs[k] is not None: self._cbs[k](self, v)
def __repr__(self):
"""``includeDocs`` mainly used internally when generating docs in sphinx."""
ks = list(k for k in self.__dict__ if not k.startswith("_"))
kSpace = max([1, *(ks | k1lib.cli.lengths())]); s = "Settings:\n"
for k, v in self._simpleSettings():
s += f"- {k.ljust(kSpace)} = {k1lib.limitChars(str(v), settings.displayCutoff)}{sep}{self._docsOf(k)}\n"
for k, v in self._subSettings():
sub = k1lib.tab("\n".join(v.__repr__().split("\n")[1:-1]), " ")
s += f"- {k.ljust(kSpace)} = <Settings>{sep}{self._docsOf(k)}\n" + sub + "\n"
return s.split("\n") | k1lib.cli.op().split(sep).all() | k1lib.cli.pretty(sep) | k1lib.cli.join("\n")
_settings = Settings().add("test", Settings().add("bio", True, "whether to test bioinformatics clis that involve strange command line tools like samtools and bwa"))
settings = Settings().add("displayCutoff", 50, "cutoff length when displaying a Settings object")
settings.add("svgScale", 0.7, "default svg scales for clis that displays graphviz graphs")
def _cb_wd(s, p):
if p != None: p = os.path.abspath(os.path.expanduser(p)); _oschdir(p)
s.__dict__["wd"] = p
def oschdir(path): settings.wd = path
_oschdir = os.chdir; os.chdir = oschdir; os.chdir.__doc__ = _oschdir.__doc__
settings.add("wd", os.getcwd(), "default working directory, will get from `os.getcwd()`. Will update using `os.chdir()` automatically when changed", _cb_wd)
settings.add("cancelRun_newLine", True, "whether to add a new line character at the end of the cancel run/epoch/batch message")