# AUTOGENERATED FILE! PLEASE DON'T EDIT
from k1lib.imports import *
getFolderSize = ls() | filt(os.path.isdir).split() | apply(lambda x: x | (tryout(0) | getFolderSize)) + apply(os.path.getsize) | toSum().all() | toSum() | deref()
getFilesInFolder = aS(os.walk) | cut(0, 2) | ungroup(True, True) | join(os.sep).all()
[docs]def getIr(base): return None | applyCl.aS(lambda: ls(base) | iden() & apply(lambda x: x | (tryout(0) | (aS(os.path.getsize) if os.path.isfile(x) else getFolderSize))) | transpose() | deref()) | ungroup(begin=True) | insertIdColumn(True) | deref()
[docs]def normalize(d):
d = d | deref(); s = d | cut(1) | toSum()
return d | apply(op()/s, 1) | sort(0, False) | deref()
[docs]@lru_cache
def statsCpu():
cpu = None | applyCl.aS(applyCl.cpu) | sort(0, False) | deref()
cpuF = None | applyCl.aS(applyCl.cpu) | aS(normalize); cpuF # "cpuF" = cpu fraction. List[nodeId, cpu fraction]
return [cpu, cpuF]
[docs]@lru_cache
def statsNodeId(): return applyCl.nodeIds()
_statsS1 = statsNodeId() | apply(wrapList() | insert(0, False)) | toDict()
[docs]def stats(ir):
cpu, cpuF = statsCpu()
# size fraction. List[nodeId, size fraction]
sizeF = normalize({**_statsS1, **ir | groupBy(1, True) | apply(cut(2) | toSum(), 1) | toDict()}.items()); sizeF
return *statsCpu(), sizeF
[docs]def scores(ir): cpu, cpuF, sizeF = stats(ir); return [cpuF, sizeF] | cut(1).all() | transpose() | ~apply(lambda c,s: s-c) | deref() # negpos
[docs]def score(ir): return scores(ir) | apply(lambda x: abs(x)**2) | toSum() # pos
[docs]def move(ir, nA:str, nB:str, idx:int):
ir1 = ir | deref(); ir1[idx][1] = nB; a = ir1 | pretty() | join('\n'); return ir1
[docs]def optimize(ir):
cpu, cpuF, sizeF = stats(ir); scs = scores(ir); a = np.argmax(scs); b = np.argmin(scs)
# print(scs, a, b)
nodeIds = applyCl.nodeIds() | sort(None, False) | deref()
files = {**nodeIds | apply(wrapList() | insert([], False)) | toDict(), **ir | groupBy(1, True) | toDict()}.items() | sort(0, False) | deref()
fA = files[a]; fB = files[b] # files A. [nodeId, List[idx, url, size]]
# print(fA); print(fB)
nA = nodeIds[a]; nB = nodeIds[b] # A node id
sA, sB = [fA, fB] | apply(op()[1] | cut(2) | deref() | aS(np.array, dtype=int)) # file sizes A
# print(f"sA: {sA} {sB}")
spA = sA.sum() - sA; spB = sB.sum() + sA # sum prime A, array[files]
sp = spA + spB; sfA = spA/sp; sfB = spB/sp
cA = cpu[a][1]; cB = cpu[b][1] # cpu A
c = cA + cB; cfA = cA/c; cfB = cB/c # cpu fraction A
# print(f"sfA: {sfA}, cfA: {cfA}")
exp = 5 # intuition says that exp should be even. But that doesn't work. Odd values work tho, but I have no idea why
idx = fA[1][((sfA-cfA)**exp + (sfB-cfB)**exp).argmin()][0]
ir2 = move(ir, nA, nB, idx)
return ir2, [nA, nB, idx, score(ir2)]
[docs]def traj(ir, maxSteps=20):
sc = score(ir); aux = None; auxs = []; irs = []
maxSteps = maxSteps if maxSteps is not None else int(1e10)
for i in range(maxSteps):
ir, aux = optimize(ir)
if aux[3] > sc: break
irs.append(ir); auxs.append(aux); sc = aux[3]
return irs, auxs
[docs]def collapse(it):
a, b = it | rows(0, -1)
c = [a[0], b[1], a[2], b[3]]
if c[0] == c[1]: return []
return [c]
[docs]def traj2(ir, maxSteps=20):
idx2FileName = ir | apply(lambda arr: [arr[0], arr[2]]) | toDict()
a = traj(ir, maxSteps)[1] | groupBy(2) | filt(lambda x: len(x) > 1).split() | (apply(collapse)) + iden() | joinStreams(2) | deref()
return a | lookup(idx2FileName, 2) | deref()
[docs]def moveFile(fileName:str, destNodeId:str, timeout=60):
"""Moves file from the current node to the destination node. Usually executed on other nodes than the driver node"""
fn = os.path.expanduser(fileName); dirname = os.path.dirname(fn)
[destNodeId] | applyCl.aS(lambda: None | cmd(f"mkdir -p {dirname}; rm -f {fn}") | deref(), timeout=timeout) | deref()
for chunk in cat(fn, False, True): [destNodeId] | applyCl.aS(lambda: chunk >> file(fn), timeout=timeout) | deref()
None | cmd(f"rm {fn}") | deref()
[docs]def moveFF(ff:str, destNodeId:str, timeout=60):
"""Moves file or folder from the current node to the destination node"""
if os.path.isfile(ff): return moveFile(ff, destNodeId, timeout)
ff | getFilesInFolder | apply(aS(moveFile, destNodeId, timeout)) | deref()
None | cmd(f"rm -rf {ff}") | ignore()
[docs]def balanceFolder(base, audit=False, maxSteps=20): # currently executing each move step serially, will change in the future if it's too slow
applyCl.cmd(f"mkdir -p {base}")
tr = traj2(getIr(base), maxSteps)
if audit: return tr
tr | apply(lambda arr: [arr[0], arr]) | ~applyCl(lambda a,b,fn,sc: moveFF(fn, b), pre=True, timeout=60) | deref()
[docs]def getSize(url):
for i in range(10):
try: return requests.head(url, timeout=3).headers.items() | apply(op().lower(), 0) | toDict() | op()["content-length"].ab_int()
except Exception as e:
if i == 9: raise Exception(f"Can't get size of file")
[docs]class NoPartialContent(Exception): pass
[docs]def getChunk(url:str, sB:int, eB:int, timeout:float) -> bytes:
for i in range(10):
try: res = requests.get(url, headers={"Range": f"bytes={sB}-{eB}"}, timeout=timeout)
except Exception as e:
if i == 9: raise Exception(f"Can't get file chunk")
continue
if res.status_code != 206: raise NoPartialContent("Server doesn't allow partial downloads at this particular url")
return res.content
[docs]def getChunks(url:str, sB:int, eB:int, chunkSize=None, chunkTimeout:float=10) -> List[bytes]:
"""Grabs bytes from sB to eB in chunks"""
chunkSize = chunkSize or settings.cli.cat.chunkSize
return range(sB, eB+1) | batched(chunkSize, True) | apply(lambda r: getChunk(url, r.start, r.stop-1, chunkTimeout))
[docs]def download(url:str, folder:str, merge:bool=False, timeout=120, chunkTimeout=5):
getChunk(url, 0, 1, 10) # try to see if server accepts partial downloads first
folder = os.path.expanduser(folder); dirname = os.path.dirname(folder)
if merge: destFile = folder; folder = b"" | file(); None | cmd(f"rm -rf {folder}") | ignore(); None | cmd(f"mkdir -p {folder}") | ignore()
applyCl.cmd(f"rm -rf {folder}"); applyCl.cmd(f"mkdir -p {folder}"); size = getSize(url)
cpus = None | applyCl.aS(lambda: applyCl.cpu()) | deref(); n = cpus | cut(1) | toSum()
tasks = [cpus | ~apply(lambda x,y: [x]*y) | joinStreams(), range(size) | splitW(*[1]*n)] | transpose() | insertIdColumn(True, False) | ~apply(lambda x,y,z: [x,[y,f"{folder}/{z}.bin"]]) | deref(1)
tasks | ~applyCl(lambda r, fn: getChunks(url, r.start, r.stop-1, None, chunkTimeout) | file(fn), pre=True, timeout=timeout) | deref()
if merge:
None | cmd(f"rm -rf {destFile}") | deref()
None | cmd(f"mkdir -p {dirname}") | deref()
None | applyCl.aS(lambda: ls(folder)) | ungroup(True, True) | deref() | sortF(op().split(".bin")[0].split("/")[-1].ab_int(), 1) | applyCl(cat(text=False), pre=True) | cut(1) | file(destFile)
None | cmd(f"rm -rf {folder}") | deref()
[docs]def a_transfer(fn, nse, nodeB, rpF:callable=iden()):
"""Transfers a lot of blocks from a bunch of nodes to nodeB. Does not delete from those node though
nse = List[nodeAId, [sB, eB]]
Runs on driver process, blocks, so better use applyTh outside of this
:param rpF: ray progress function"""
blockSize = settings.cli.cat.chunkSize
def inner():
totalBytes = nse | cut(1) | ~apply(lambda x,y:y-x) | toSum(); currentByte = 0
for chunk in nse | ~apply(lambda x, y: range(x, y) | batched(blockSize, True) | apply("[x.start, x.stop]"), 1) | ungroup(True, True) | deref()\
| ~applyCl(lambda sB, eB: cat(fn, False, sB=sB, eB=eB), pre=True, timeout=None, prefetch=20) | cut(1):
chunk >> file(fn); currentByte += len(chunk); rpF(currentByte/totalBytes)
[nodeB] | applyCl.aS(inner, timeout=None) | item()
[docs]def decommission(fn:str, nodeAs:List[str], nodeBs:List[str], rS=iden()):
"""Spreads out a particular file in nodeAs to all nodeBs, to prepare
to decomission nodeAs. The 2 sets should be mutually exclusive
:param rS: instance of refineSeek"""
nodeAs, nodeBs = [nodeAs, nodeBs] | deref()
if len(nodeAs) == 0: return
if len(nodeBs) == 0: raise Exception("Unsupported configuration! Trying to move data from A+B to C+D. Has to have some shared nodes, like moving data from A+B+C to B+C+D. This is not a fundamental limitation, but just can't be done with the current architecture. Might be fixed in the future.")
# some initial metadata
nodeIds = applyCl.nodeIds(); nodeId_cpu = nodeIds | applyCl.aS(lambda: applyCl.cpu()) | deref(); nodeId2Cpu = nodeId_cpu | toDict()
ws = nodeId_cpu | inSet(nodeBs, 0) | cut(1) | deref() # weights to split files on nodeAs into
# splitting file on nodeAs into chunks first, to plan things out
a = nodeAs | applyCl.aS(lambda: fn | splitSeek(ws=ws) | rS | window(2) | deref() | insertColumn(nodeBs) | insert(applyCl.nodeId()).all() | deref()) | cut(1) | joinStreams() | deref()
# actually transferring chunks
with ray.progress(a | groupBy(1) | shape(0), "Decommissioning") as rp:
c = b = a | groupBy(1, True) | apply(iden() + apply(lambda arr: [arr[0], arr[1:]]) | reverse() | insert(fn)) | deref()
enumerate(c) | applyTh(~aS(lambda idx, e: a_transfer(*e, rpF=aS(lambda p: ray.get(rp.update.remote(idx, p))))), timeout=None) | deref()
# deleting files from nodeAs
nodeAs | applyCl.aS(lambda: None | cmd(f"rm -rf {fn}") | ignore()) | deref()
[docs]def spreadOut(fn:str, nAs:List[str], nBs:List[str], rS=iden()):
"""Spreads out a file from nodes A to B, where B fully contains A (no decomissioning).
A and B should be mutually exclusive. Initial nodes are A, final nodes are A + B"""
nAs, nBs = [nAs, nBs] | deref(); rS.fn = fn
if len(nBs) == 0: return # no need to spread out
nBs | applyCl.aS(lambda: None | cmd(f"mkdir -p {os.path.dirname(fn)}") | deref(), timeout=None) | deref()
nBs | applyCl.aS(lambda: None | cmd(f"rm -rf {fn}") | deref(), timeout=None) | deref()
# some initial metadata
nodeIds = applyCl.nodeIds(); nodeId_cpu = nodeIds | applyCl.aS(lambda: applyCl.cpu()) | deref(); nodeId2Cpu = nodeId_cpu | toDict()
sizes = nAs | applyCl.aS(lambda: os.path.getsize(fn) if os.path.exists(fn) else 0) | deref(); totalSize = sizes | cut(1) | toSum()
ns = [*nAs, *nBs]; totalCpu = ns | lookup(nodeId2Cpu) | toSum(); bytePerCpu = totalSize/totalCpu; wsB = nBs | lookup(nodeId2Cpu) | deref()
# prepares segments and metadata, List[nodeId, [sB, eB]], where sB and eB are the ranges of nAs that they're willing to share
sizePost = sizes | ~apply(lambda idx, size: [idx, nodeId2Cpu[idx]/totalCpu*totalSize/size]) | deref()
invalidNodes = sizePost | ~filt(lambda x: 0 <= x <= 1, 1) | cut(0) | deref()
if len(invalidNodes) > 0: raise Exception(f"Unsupported configuration! These nodes have too little data to share: {invalidNodes}. This couldn't have happen using applyCl alone. Data is not corrupted, but you'll have to combine data from all files into 1 and spread them back out again.")
inter = sizePost | ~apply(lambda idx, x: [idx, [x, 1-x]]) | applyCl(lambda ws: fn | splitSeek(ws=ws) | rS | ~head(1), pre=True) | deref() | filt(~aS(lambda x,y: y-x>0), 1) | deref()
# actually transferring data to new nodes
meta = inter | apply(~aS(range) | splitW(*wsB) | rS | apply(wrapList()) | insertColumn(nBs) | deref(1), 1) | ungroup(begin=True) | apply("[x.start, x.stop]", 2) | groupBy(1, True) | deref()
with ray.progress(len(meta), "Transferring data to new nodes") as rp:
meta | insertIdColumn(True) | applyTh(~aS(lambda idx, nB, nse: a_transfer(fn, nse, nB, rpF=aS(lambda p: ray.get(rp.update.remote(idx, p))))), timeout=None) | deref()
# truncates the files in nAs nodes
inter | ~apply(lambda idx,se: [idx,se[0]]) | applyCl(lambda sB: open(fn, 'a').truncate(sB), pre=True, timeout=None) | deref()
[docs]def balanceFile(fn:str, nAs:List[str]=None, nBs:List[str]=None, rS=iden()):
fn = os.path.expanduser(fn)
if nAs is None: nAs = None | applyCl.aS(lambda: os.path.exists(fn)) | filt(op(), 1) | cut(0) | deref()
if nBs is None: nBs = applyCl.nodeIds()
decommission(fn, *nAs | inSet(nBs).split() | reverse(), rS)
spreadOut(fn, *nBs | inSet(nAs).split(), rS)
[docs]def diskScan1(base:str) -> List[str]: # like ls(), but returns files and folders that appear at least on 2 nodes
isdir, base = base.split("\ue000")
if not isdir: return []
return None | applyCl.aS(lambda: base | (tryout([]) | ls() | apply(os.path.isdir) & iden() | transpose() | ~apply(lambda x,y: f"{x*1}\ue000{y}")) | deref()) | cut(1) | joinStreams() | count() | filt(op()>1, 0) | cut(1) | deref()
[docs]def diskScan2(base:str) -> Tuple[List[str], List[str]]: # returns list of distributed folders and list of distributed files
dFolders = []; folders, files = diskScan1(base) | op().split("\ue000").all() | toInt(0) | filt(op(), 0).split() | (join("\ue000")).all(2) | deref()
# print("2--", folders, files, base)
for folder in folders:
fol, fil = diskScan2(folder); dFolders.extend(fol); files.extend(fil)
if len(fol) + len(fil) == 0: dFolders.append(folder) # no shared contents, must be a distributed folder
else: files.extend(fil)
# print("3--", [dFolders, files], base)
return [dFolders, files]
[docs]def diskScan3(base:str): base = os.path.expanduser(base); return diskScan2(f"1\ue000{base}") | op().split("\ue000")[1].all(2) | deref()
[docs]def diskScan4(base:str, sortSize=True): # fully featured data
folders, files = diskScan3(base)
folders = [folders, None | applyCl.aS(lambda: folders | apply(lambda x: (x | getFolderSize) if os.path.exists(x) else 0) | deref()) | cut(1) | transpose()] | transpose() | deref()
files = [files, None | applyCl.aS(lambda: files | apply(lambda x: os.path.getsize(x) if os.path.exists(x) else 0) | deref()) | cut(1) | transpose()] | transpose() | deref()
post = apply(~sortF(toSum(), 1)) if sortSize else iden()
return [folders, files] | wrapList() + filt(filt(op() > 0) | count() | shape(0) | (op() == 1), 1).split() | joinStreams() | apply(unique(0)) | post | deref()
[docs]def diskScan5(base:str, sortSize=True): # displays it in a nice format
d4 = diskScan4(base, sortSize); nodeNames = None | applyCl.aS(lambda: applyCl.cpu()) | apply(op()[:5], 0) | apply('f"{x} thr"', 1) | join(", ").all() | deref(); nodeNames
d5 = d4 | apply(~apply(lambda path, sizes: [path, sizes | toSum() | aS(fmt.size), sizes | apply(fmt.size)]) | insert(["-"*40, "-"*10, ["-"*12]*len(nodeNames)]) | insert(["", "", nodeNames])) | deref(); d5
ws = d5 | shape(0).all() | deref()
d6 = d5 | joinStreams() | cut(0, 1) & (cut(2) | pretty() | wrapList().all()) | transpose() | joinStreams().all() | splitW(*ws) | insert(["Path", "Total size", "Size on each node (node id and thread count)"]).all() | joinStreams() | pretty() | splitW(*ws | apply(op()+1)) | deref()
explainers = ["\nA distributed folder is a folder that has many files and folders inside, but their names\nare all different from each other. It's managed by applyCl.balanceFolder()",
"\nA replicated file is a file that has been copied to multiple nodes. Size of all file\ncopies should be the same. It's managed by applyCl.replicateFile()",
"\nA distributed file is a file that has been split into multiple pieces and sent to other\nnodes. It's managed by applyCl.balanceFile()"]
[d6, ["Distributed folders", "Replicated files", "Distributed files"] | (aS(lambda x: [["-"*60, x, "-"*60] | join(" ")])).all()] | transpose() | permute(1, 0) | (joinStreams() | join("\n")).all() | wrapList() | insert(explainers, False) | transpose() | join("\n").all() | join("\n"*2) | wrapList() | stdout()