# AUTOGENERATED FILE! PLEASE DON'T EDIT HERE. EDIT THE SOURCE NOTEBOOKS INSTEAD
import k1lib as k1, os
import k1lib.cli as cli
from collections import deque
from functools import lru_cache
from contextlib import contextmanager
__all__ = ["sql", "s3", "s3bucket"]
@contextmanager
def mysqlCnf(user, password, host, port): # mysqlCnf
fn = f"""[client]\nuser = "{user}"\npassword = "{password or ''}"\nhost = "{host}"\nport = "{port}" """ | cli.file() # mysqlCnf
try: yield fn # mysqlCnf
finally: os.remove(fn) # mysqlCnf
mysqlConn = k1.dep("mysql.connector", "mysql-connector-python", "https://pypi.org/project/mysql-connector-python/") # mysqlCnf
pgConn = k1.dep("psycopg2", "psycopg2-binary", "https://pypi.org/project/psycopg2/") # mysqlCnf
sqlite3 = k1.dep("sqlite3") # mysqlCnf
qD = {"my": "`", "pg": "", "lite": ""} # quote dict # mysqlCnf
[docs]class sql: # sql
[docs] def __init__(self, host, port=3306, user=None, password="", mode="my"): # sql
"""Creates a connection to a SQL database.
Example::
s = sql("localhost") # creates a new sql object
s.refresh() # refreshes connection any time you encounter strange bugs
s | ls() # lists out all databases
s | toBytes() # dumps every databases, returns Iterator[str]. Yes, it's "toBytes", but it has the feeling of serializing whatever the input is, so it's at least intuitive in that way
"dump.sql" | s # restores the database using the dump file
cat("dump.sql") | s # restores the database using the dump file
db1 = s | ls() | filt(lambda x: x.name == "db1") | item() # gets database named "db1"
db1 = s | ls() | grep("db1") | item() # gets database with name that contains "db1" string, which is more convenient (could get "db1" or "db1_a" or "a_db1" btw, so be careful)
db1 | ls() # list out all tables within this database
db1 | toBytes() # dumps the database, returns Iterator[str]
users = db1 | ls() | filt(lambda x: x.name == "user") | item() # gets table named "user"
users = db1 | ls() | grep("user") | item() # gets table with name that contains "user", which is more convenient (could get "user" or "user_vehicle", so again, be careful)
db1.query("select * from users") # queries the database using your custom query
db1.query("select * from users where user_id=%s", 3) # queries with prepared statement
users.info() # prints out the first 10 rows of the table and the table schema
users.cols # returns table's columns as List[str]
users.query(...) # can also do a custom query, just like with databases
users | toBytes() # dumps this specific table, returns Iterator[str]
users | cat() | display() # reads entire table, gets first 10 rows and displays it out
users | (cat() | head(20)) | display() # reads first 20 rows only, then displays the first 10 rows. Query sent is "select * from user limit 20"
users | (cat() | filt("x == 4", 3) | head(20)) | display() # grabs first 20 rows that has the 4th column equal to 4, then displays the first 10 rows. Query sent is "select user_id, address, balance, age from user where age = 4", assuming the table only has those columns
:param host: host name, ip address, or file name (in case of sqlite)
:param port: port at the host
:param uesr: database user name. If not specified then fallback to environment variable ``SQL_USER``, then ``USER``
:param password: database password. If not specified then assume database doesn't require one
:param mode: currently supports 3 values: "my" (MySQL), "pg" (PostgreSQL) and "lite" (SQLite)
""" # sql
self.host = host; self.port = port; self.user = user or os.environ.get("SQL_USER") or os.environ.get("USER") # sql
self.password = password or os.environ.get("SQL_PASSWORD"); self.db = None; self.mode = mode; self.conn = None; self.refresh() # sql
[docs] def refresh(self): # sql
try: self.conn.close(); self.conn = None # sql
except: pass # sql
kwargs = dict(host=self.host, port=self.port, user=self.user, password=self.password, database=self.db) # sql
if self.mode == "my": self.conn = mysqlConn.connect(**kwargs, charset='utf8mb4', collation='utf8mb4_general_ci') # sql
elif self.mode == "pg": self.conn = pgConn.connect(**kwargs) # sql
elif self.mode == "lite": self.conn = sqlite3.connect(self.host) # sql
else: raise Exception(f"Can only support 'my' (MySQL), 'pg' (PostgreSQL) and 'lite' (SQLite) for now, can't support {self.mode}") # sql
def _changeDb(self, db): # sql
if self.db != db: # sql
if self.mode == "my": self.query(f"use `{db}`") # sql
elif self.mode == "pg": self.db = db; self.refresh() # sql
elif self.mode == "lite": pass # sql
self.db = db # sql
[docs] def query(self, query, *args): # sql
cur = self.conn.cursor(); cur.execute(query, args) # sql
try: ans = cur.fetchall() # sql
except: ans = None # sql
cur.close(); self.conn.commit(); return ans # sql
def _ls(self): # sql
if self.mode == "my": return [sqldb(self, e[0]) for e in self.query("show databases")] # sql
elif self.mode == "pg": return [sqldb(self, e[0]) for e in self.query("select datname from pg_database where datistemplate=false")] # sql
elif self.mode == "lite": return [sqldb(self, "default")] # sql
def __repr__(self): return f"<sql mode={self.mode} host={self._host}>" # sql
@property # sql
def _host(self) -> str: # sql
if self.mode == "lite": return self.host.split(os.sep)[-1] # sql
else: return f"{self.host}:{self.port}" # sql
def _cnfCtx(self): return mysqlCnf(self.user, self.password, self.host, self.port) # sql
def _toBytes(self): # sql
if self.mode == "my": # sql
with self._cnfCtx() as fn: yield from None | cli.cmd(f"mysqldump --defaults-file={fn} --single-transaction --hex-blob --all-databases") # sql
else: raise Exception(f"All databases dump of mode {self.mode} is not supported yet") # sql
[docs] def __ror__(self, it): # restoring a backup # sql
if self.mode == "my": # sql
def restore(fn): # sql
with self._cnfCtx() as cnfFn: None | cli.cmd(f"mysql --defaults-file={cnfFn} < {fn}") | cli.ignore() # sql
if isinstance(it, str): restore(it) # sql
else: fn = it | cli.file(); restore(fn); os.remove(fn) # sql
else: raise Exception(f"Restoring database from .sql file of mode {self.mode} is not supported yet") # sql
class sqldb: # sqldb
def __init__(self, sql:sql, name:str): # sqldb
"""A sql database representation. Not expected to be instatiated by you. See also: :class:`sql`""" # sqldb
self.sql = sql; self.name = name # sqldb
def query(self, query, *args): self.sql._changeDb(self.name); return self.sql.query(query, *args) # sqldb
def _ls(self): # sqldb
if self.sql.mode == "my": return [sqltable(self.sql, self, e[0]) for e in self.query(f"show tables")] # sqldb
if self.sql.mode == "pg": return [sqltable(self.sql, self, e[0]) for e in self.query(f"select table_name from information_schema.tables")] # sqldb
if self.sql.mode == "lite": return [sqltable(self.sql, self, e[0]) for e in self.query("select name from sqlite_master where type='table'")] # sqldb
def __repr__(self): return f"<sqldb host={self.sql._host} db={self.name}>" # sqldb
def _toBytes(self): # sqldb
if self.sql.mode == "my": # sqldb
with self.sql._cnfCtx() as fn: yield from None | cli.cmd(f"mysqldump --defaults-file={fn} --single-transaction --hex-blob --databases {self.name}") # sqldb
else: raise Exception(f"Database dump of mode {self.sql.mode} is not supported yet") # sqldb
def __ror__(self, it): return self.sql.__ror__(it) # sqldb
class sqltable: # sqltable
def __init__(self, sql, sqldb, name:str): # sqltable
"""A sql table representation. Not expected to be instantiated by you. See also: :class:`sql`""" # sqltable
self.sql = sql; self.sqldb = sqldb; self.name = name; self._cols = None # sqltable
def _cat(self, ser): # sqltable
cols = self.cols; _2 = [] # clis that can't be optimized, stashed away to be merged with ser later on # sqltable
q = qD[self.sql.mode]; clis = deque(ser.clis) # sqltable
o1 = None # cut() opt # sqltable
o2 = None # head() opt # sqltable
o3 = [] # filt() opt # sqltable
while len(clis) > 0: # sqltable
c = clis.popleft() # sqltable
if isinstance(c, cli.filt): _2.append(c); break # TODO: add optimizations for filt # sqltable
elif o2 is None and isinstance(c, cli.head): # sqltable
if round(c.n) != c.n or c.n < 0 or c.inverted or c.n == None: _2.append(c); break # sqltable
else: o2 = f"limit {c.n}"; continue # sqltable
elif o1 is None and isinstance(c, cli.cut): # sqltable
if isinstance(c.columns, slice): _2.append(c); o1 = 0; continue # sqltable
else: # sqltable
o1 = ", ".join([f"{q}{c}{q}" for c in cols | cli.rows(*c.columns)]) # sqltable
if len(c.columns) == 1: _2.append(cli.item().all() | cli.aS(list)) # sqltable
else: _2.append(c); break # sqltable
o1 = o1 or ", ".join([f"{q}{c}{q}" for c in cols]) # sqltable
query = f"select {o1} from {q}{self.name}{q} {o2 or ''}"#; print(f"query: {query}"); return [] # sqltable
return self.sqldb.query(query) | cli.serial(*_2, *clis) # sqltable
@property # sqltable
def cols(self): # sqltable
"""Get column names""" # sqltable
if not self._cols: self._cols = self.describe()[1:] | cli.cut({"my": 0, "pg": 0, "lite": 1}[self.sql.mode]) | cli.deref() # sqltable
return self._cols # sqltable
@lru_cache # sqltable
def describe(self): # sqltable
if self.sql.mode == "my": return self.sqldb.query(f"describe `{self.name}`") | cli.insert(["Field", "Type", "Null", "Key", "Default", "Extra"]) | cli.deref() # sqltable
if self.sql.mode == "pg": return self.sqldb.query(f"select column_name, data_type, is_nullable, column_default, ordinal_position from information_schema.columns where table_name='{self.name}'") | cli.insert(["column_name", "data_type", "is_nullable", "column_default", "ordinal_position"]) | cli.deref() # sqltable
if self.sql.mode == "lite": return self.sqldb.query(f"pragma table_info([{self.name}])") | cli.insert(["cid", "name", "type", "notnull", "dflt_value", "pk"]) | cli.deref() # sqltable
def info(self, out=False): # sqltable
"""Preview table
:param out: if True, returns a list of lines instead of printing them out""" # sqltable
def gen(): # sqltable
print(f"Table `{self.name}`\n") # sqltable
desc = self.describe() | cli.deref(); cols = self.cols; q = qD[self.sql.mode]; s = ", ".join([f"{q}{e}{q}" for e in cols]) # sqltable
self.sqldb.query(f"select {s} from {q}{self.name}{q} limit 9") | (cli.aS(repr) | cli.head(50)).all(2) | cli.insert(cols) | cli.display(); print("") # sqltable
desc | cli.display(None) # sqltable
if out: # sqltable
with k1.captureStdout() as out: gen() # sqltable
return out() # sqltable
else: gen() # sqltable
def query(self, query, *args): return self.sqldb.query(query, *args) # sqltable
def __repr__(self): return f"<sqltable host={self.sql._host} db={self.sqldb.name} table={self.name}>" # sqltable
def _toBytes(self): # sqltable
if self.sql.mode == "my": # sqltable
with self.sql._cnfCtx() as fn: yield from None | cli.cmd(f"mysqldump --defaults-file={fn} --single-transaction --hex-blob {self.sqldb.name} {self.name}") # sqltable
else: raise Exception(f"Table dump of mode {self.sql.mode} is not supported yet") # sqltable
def __ror__(self, it): return self.sql.__ror__(it) # sqltable
boto3 = k1.dep("boto3") # sqltable
[docs]class s3: # s3
[docs] def __init__(self, client): # s3
"""Represents an S3 client.
Example::
client = boto3.client("s3", ...) # put your credentials and details here
db = s3(client) # creates an S3 manager
db | ls() # lists all buckets accessible
bucket = db | ls() | item() # grabs the first bucket, returns object of type s3bucket
bucket = s3bucket(client, "bucket-name") # or you can instantiate the bucket directly
bucket | ls() # lists all objects within this bucket
bucket | ls() | grep("\\.so") # grabs all .so files from the bucket
obj = bucket | ls() | item() # grabs the first object within this bucket, returns object of type s3obj
obj.key, obj.size, obj.lastModified # some fields directly accessible
This mostly offers interoperability with ls() and cat(), so that you can
write relatively intuitive code, but fundamentally provides no upsides""" # s3
self.client = client # s3
def _ls(self): return [s3bucket(self.client, x["Name"]) for x in self.client.list_buckets()["Buckets"]] # s3
def __repr__(self): return f"<kaws.s3 client>" # s3
[docs]class s3bucket: # s3bucket
[docs] def __init__(self, client, name:str): # s3bucket
"""Represents an S3 bucket.
See also: :class:`s3`""" # s3bucket
self.client = client; self.name = name # s3bucket
def _ls(self): client = self.client; name = self.name; return [s3obj(client, name, data) for data in self.client.list_objects(Bucket=name).get("Contents", [])] # s3bucket
def __repr__(self): return f"<s3bucket name='{self.name}'>" # s3bucket
class s3obj: # s3obj
def __init__(self, client, bucket:str, data): # s3obj
"""Represents an S3 object. Not intended to be instantiated directly.
See also: :class:`s3`""" # s3obj
self.client = client; self.bucket = bucket # s3obj
self.key = data["Key"]; self.lastModified = data["LastModified"] # s3obj
self.size = data["Size"]; self.storageClass = data["StorageClass"] # s3obj
def __repr__(self): return f"<s3obj bucket='{self.bucket}' key='{self.key}' size={fmt.size(self.size)}>" # s3obj
def _cat(self, kwargs): # s3obj
if kwargs["text"] is not False: raise Exception(f"s3obj does not support `cat(text=True)`. Please use `cat(text=False)` instead") # s3obj
if kwargs["chunks"]: raise Exception(f"s3obj does not support `cat(chunks=True)`") # s3obj
sB = kwargs["sB"]; eB = kwargs["eB"] # s3obj
if eB < 0: eB = self.size # s3obj
return self.client.get_object(Bucket=self.bucket, Key=self.key, Range=f'bytes={sB}-{eB-1}')["Body"].read() # s3obj