Source code for k1lib.cli.lsext

# AUTOGENERATED FILE! PLEASE DON'T EDIT HERE. EDIT THE SOURCE NOTEBOOKS INSTEAD
import k1lib as k1, os
import k1lib.cli as cli
from collections import deque
from functools import lru_cache
from contextlib import contextmanager
__all__ = ["sql", "s3", "s3bucket"]
@contextmanager
def mysqlCnf(user, password, host, port):                                        # mysqlCnf
    fn = f"""[client]\nuser = "{user}"\npassword = "{password or ''}"\nhost = "{host}"\nport = "{port}" """ | cli.file() # mysqlCnf
    try: yield fn                                                                # mysqlCnf
    finally: os.remove(fn)                                                       # mysqlCnf
mysqlConn = k1.dep("mysql.connector", "mysql-connector-python", "https://pypi.org/project/mysql-connector-python/") # mysqlCnf
pgConn = k1.dep("psycopg2", "psycopg2-binary", "https://pypi.org/project/psycopg2/") # mysqlCnf
sqlite3 = k1.dep("sqlite3")                                                      # mysqlCnf
qD = {"my": "`", "pg": "", "lite": ""} # quote dict                              # mysqlCnf
[docs]class sql: # sql
[docs] def __init__(self, host, port=3306, user=None, password="", mode="my"): # sql """Creates a connection to a SQL database. Example:: s = sql("localhost") # creates a new sql object s.refresh() # refreshes connection any time you encounter strange bugs s | ls() # lists out all databases s | toBytes() # dumps every databases, returns Iterator[str]. Yes, it's "toBytes", but it has the feeling of serializing whatever the input is, so it's at least intuitive in that way "dump.sql" | s # restores the database using the dump file cat("dump.sql") | s # restores the database using the dump file db1 = s | ls() | filt(lambda x: x.name == "db1") | item() # gets database named "db1" db1 = s | ls() | grep("db1") | item() # gets database with name that contains "db1" string, which is more convenient (could get "db1" or "db1_a" or "a_db1" btw, so be careful) db1 | ls() # list out all tables within this database db1 | toBytes() # dumps the database, returns Iterator[str] users = db1 | ls() | filt(lambda x: x.name == "user") | item() # gets table named "user" users = db1 | ls() | grep("user") | item() # gets table with name that contains "user", which is more convenient (could get "user" or "user_vehicle", so again, be careful) db1.query("select * from users") # queries the database using your custom query db1.query("select * from users where user_id=%s", 3) # queries with prepared statement users.info() # prints out the first 10 rows of the table and the table schema users.cols # returns table's columns as List[str] users.query(...) # can also do a custom query, just like with databases users | toBytes() # dumps this specific table, returns Iterator[str] users | cat() | display() # reads entire table, gets first 10 rows and displays it out users | (cat() | head(20)) | display() # reads first 20 rows only, then displays the first 10 rows. Query sent is "select * from user limit 20" users | (cat() | filt("x == 4", 3) | head(20)) | display() # grabs first 20 rows that has the 4th column equal to 4, then displays the first 10 rows. Query sent is "select user_id, address, balance, age from user where age = 4", assuming the table only has those columns :param host: host name, ip address, or file name (in case of sqlite) :param port: port at the host :param uesr: database user name. If not specified then fallback to environment variable ``SQL_USER``, then ``USER`` :param password: database password. If not specified then assume database doesn't require one :param mode: currently supports 3 values: "my" (MySQL), "pg" (PostgreSQL) and "lite" (SQLite) """ # sql self.host = host; self.port = port; self.user = user or os.environ.get("SQL_USER") or os.environ.get("USER") # sql self.password = password or os.environ.get("SQL_PASSWORD"); self.db = None; self.mode = mode; self.conn = None; self.refresh() # sql
[docs] def refresh(self): # sql try: self.conn.close(); self.conn = None # sql except: pass # sql kwargs = dict(host=self.host, port=self.port, user=self.user, password=self.password, database=self.db) # sql if self.mode == "my": self.conn = mysqlConn.connect(**kwargs, charset='utf8mb4', collation='utf8mb4_general_ci') # sql elif self.mode == "pg": self.conn = pgConn.connect(**kwargs) # sql elif self.mode == "lite": self.conn = sqlite3.connect(self.host) # sql else: raise Exception(f"Can only support 'my' (MySQL), 'pg' (PostgreSQL) and 'lite' (SQLite) for now, can't support {self.mode}") # sql
def _changeDb(self, db): # sql if self.db != db: # sql if self.mode == "my": self.query(f"use `{db}`") # sql elif self.mode == "pg": self.db = db; self.refresh() # sql elif self.mode == "lite": pass # sql self.db = db # sql
[docs] def query(self, query, *args): # sql cur = self.conn.cursor(); cur.execute(query, args) # sql try: ans = cur.fetchall() # sql except: ans = None # sql cur.close(); self.conn.commit(); return ans # sql
def _ls(self): # sql if self.mode == "my": return [sqldb(self, e[0]) for e in self.query("show databases")] # sql elif self.mode == "pg": return [sqldb(self, e[0]) for e in self.query("select datname from pg_database where datistemplate=false")] # sql elif self.mode == "lite": return [sqldb(self, "default")] # sql def __repr__(self): return f"<sql mode={self.mode} host={self._host}>" # sql @property # sql def _host(self) -> str: # sql if self.mode == "lite": return self.host.split(os.sep)[-1] # sql else: return f"{self.host}:{self.port}" # sql def _cnfCtx(self): return mysqlCnf(self.user, self.password, self.host, self.port) # sql def _toBytes(self): # sql if self.mode == "my": # sql with self._cnfCtx() as fn: yield from None | cli.cmd(f"mysqldump --defaults-file={fn} --single-transaction --hex-blob --all-databases") # sql else: raise Exception(f"All databases dump of mode {self.mode} is not supported yet") # sql
[docs] def __ror__(self, it): # restoring a backup # sql if self.mode == "my": # sql def restore(fn): # sql with self._cnfCtx() as cnfFn: None | cli.cmd(f"mysql --defaults-file={cnfFn} < {fn}") | cli.ignore() # sql if isinstance(it, str): restore(it) # sql else: fn = it | cli.file(); restore(fn); os.remove(fn) # sql else: raise Exception(f"Restoring database from .sql file of mode {self.mode} is not supported yet") # sql
class sqldb: # sqldb def __init__(self, sql:sql, name:str): # sqldb """A sql database representation. Not expected to be instatiated by you. See also: :class:`sql`""" # sqldb self.sql = sql; self.name = name # sqldb def query(self, query, *args): self.sql._changeDb(self.name); return self.sql.query(query, *args) # sqldb def _ls(self): # sqldb if self.sql.mode == "my": return [sqltable(self.sql, self, e[0]) for e in self.query(f"show tables")] # sqldb if self.sql.mode == "pg": return [sqltable(self.sql, self, e[0]) for e in self.query(f"select table_name from information_schema.tables")] # sqldb if self.sql.mode == "lite": return [sqltable(self.sql, self, e[0]) for e in self.query("select name from sqlite_master where type='table'")] # sqldb def __repr__(self): return f"<sqldb host={self.sql._host} db={self.name}>" # sqldb def _toBytes(self): # sqldb if self.sql.mode == "my": # sqldb with self.sql._cnfCtx() as fn: yield from None | cli.cmd(f"mysqldump --defaults-file={fn} --single-transaction --hex-blob --databases {self.name}") # sqldb else: raise Exception(f"Database dump of mode {self.sql.mode} is not supported yet") # sqldb def __ror__(self, it): return self.sql.__ror__(it) # sqldb class sqltable: # sqltable def __init__(self, sql, sqldb, name:str): # sqltable """A sql table representation. Not expected to be instantiated by you. See also: :class:`sql`""" # sqltable self.sql = sql; self.sqldb = sqldb; self.name = name; self._cols = None # sqltable def _cat(self, ser): # sqltable cols = self.cols; _2 = [] # clis that can't be optimized, stashed away to be merged with ser later on # sqltable q = qD[self.sql.mode]; clis = deque(ser.clis) # sqltable o1 = None # cut() opt # sqltable o2 = None # head() opt # sqltable o3 = [] # filt() opt # sqltable while len(clis) > 0: # sqltable c = clis.popleft() # sqltable if isinstance(c, cli.filt): _2.append(c); break # TODO: add optimizations for filt # sqltable elif o2 is None and isinstance(c, cli.head): # sqltable if round(c.n) != c.n or c.n < 0 or c.inverted or c.n == None: _2.append(c); break # sqltable else: o2 = f"limit {c.n}"; continue # sqltable elif o1 is None and isinstance(c, cli.cut): # sqltable if isinstance(c.columns, slice): _2.append(c); o1 = 0; continue # sqltable else: # sqltable o1 = ", ".join([f"{q}{c}{q}" for c in cols | cli.rows(*c.columns)]) # sqltable if len(c.columns) == 1: _2.append(cli.item().all() | cli.aS(list)) # sqltable else: _2.append(c); break # sqltable o1 = o1 or ", ".join([f"{q}{c}{q}" for c in cols]) # sqltable query = f"select {o1} from {q}{self.name}{q} {o2 or ''}"#; print(f"query: {query}"); return [] # sqltable return self.sqldb.query(query) | cli.serial(*_2, *clis) # sqltable @property # sqltable def cols(self): # sqltable """Get column names""" # sqltable if not self._cols: self._cols = self.describe()[1:] | cli.cut({"my": 0, "pg": 0, "lite": 1}[self.sql.mode]) | cli.deref() # sqltable return self._cols # sqltable @lru_cache # sqltable def describe(self): # sqltable if self.sql.mode == "my": return self.sqldb.query(f"describe `{self.name}`") | cli.insert(["Field", "Type", "Null", "Key", "Default", "Extra"]) | cli.deref() # sqltable if self.sql.mode == "pg": return self.sqldb.query(f"select column_name, data_type, is_nullable, column_default, ordinal_position from information_schema.columns where table_name='{self.name}'") | cli.insert(["column_name", "data_type", "is_nullable", "column_default", "ordinal_position"]) | cli.deref() # sqltable if self.sql.mode == "lite": return self.sqldb.query(f"pragma table_info([{self.name}])") | cli.insert(["cid", "name", "type", "notnull", "dflt_value", "pk"]) | cli.deref() # sqltable def info(self, out=False): # sqltable """Preview table :param out: if True, returns a list of lines instead of printing them out""" # sqltable def gen(): # sqltable print(f"Table `{self.name}`\n") # sqltable desc = self.describe() | cli.deref(); cols = self.cols; q = qD[self.sql.mode]; s = ", ".join([f"{q}{e}{q}" for e in cols]) # sqltable self.sqldb.query(f"select {s} from {q}{self.name}{q} limit 9") | (cli.aS(repr) | cli.head(50)).all(2) | cli.insert(cols) | cli.display(); print("") # sqltable desc | cli.display(None) # sqltable if out: # sqltable with k1.captureStdout() as out: gen() # sqltable return out() # sqltable else: gen() # sqltable def query(self, query, *args): return self.sqldb.query(query, *args) # sqltable def __repr__(self): return f"<sqltable host={self.sql._host} db={self.sqldb.name} table={self.name}>" # sqltable def _toBytes(self): # sqltable if self.sql.mode == "my": # sqltable with self.sql._cnfCtx() as fn: yield from None | cli.cmd(f"mysqldump --defaults-file={fn} --single-transaction --hex-blob {self.sqldb.name} {self.name}") # sqltable else: raise Exception(f"Table dump of mode {self.sql.mode} is not supported yet") # sqltable def __ror__(self, it): return self.sql.__ror__(it) # sqltable boto3 = k1.dep("boto3") # sqltable
[docs]class s3: # s3
[docs] def __init__(self, client): # s3 """Represents an S3 client. Example:: client = boto3.client("s3", ...) # put your credentials and details here db = s3(client) # creates an S3 manager db | ls() # lists all buckets accessible bucket = db | ls() | item() # grabs the first bucket, returns object of type s3bucket bucket = s3bucket(client, "bucket-name") # or you can instantiate the bucket directly bucket | ls() # lists all objects within this bucket bucket | ls() | grep("\\.so") # grabs all .so files from the bucket obj = bucket | ls() | item() # grabs the first object within this bucket, returns object of type s3obj obj.key, obj.size, obj.lastModified # some fields directly accessible This mostly offers interoperability with ls() and cat(), so that you can write relatively intuitive code, but fundamentally provides no upsides""" # s3 self.client = client # s3
def _ls(self): return [s3bucket(self.client, x["Name"]) for x in self.client.list_buckets()["Buckets"]] # s3 def __repr__(self): return f"<kaws.s3 client>" # s3
[docs]class s3bucket: # s3bucket
[docs] def __init__(self, client, name:str): # s3bucket """Represents an S3 bucket. See also: :class:`s3`""" # s3bucket self.client = client; self.name = name # s3bucket
def _ls(self): client = self.client; name = self.name; return [s3obj(client, name, data) for data in self.client.list_objects(Bucket=name).get("Contents", [])] # s3bucket def __repr__(self): return f"<s3bucket name='{self.name}'>" # s3bucket
class s3obj: # s3obj def __init__(self, client, bucket:str, data): # s3obj """Represents an S3 object. Not intended to be instantiated directly. See also: :class:`s3`""" # s3obj self.client = client; self.bucket = bucket # s3obj self.key = data["Key"]; self.lastModified = data["LastModified"] # s3obj self.size = data["Size"]; self.storageClass = data["StorageClass"] # s3obj def __repr__(self): return f"<s3obj bucket='{self.bucket}' key='{self.key}' size={fmt.size(self.size)}>" # s3obj def _cat(self, kwargs): # s3obj if kwargs["text"] is not False: raise Exception(f"s3obj does not support `cat(text=True)`. Please use `cat(text=False)` instead") # s3obj if kwargs["chunks"]: raise Exception(f"s3obj does not support `cat(chunks=True)`") # s3obj sB = kwargs["sB"]; eB = kwargs["eB"] # s3obj if eB < 0: eB = self.size # s3obj return self.client.get_object(Bucket=self.bucket, Key=self.key, Range=f'bytes={sB}-{eB-1}')["Body"].read() # s3obj