PK!zkattdophon_db/__init__.pyfrom dophon_properties import * # from .utils import show_banner get_properties([DOPHON, DB]) # show_banner()PK! dophon_db/Connection.py# coding: utf-8 # 连接包装类 import pymysql from dophon_db import properties from dophon_logger import * logger = get_logger(DOPHON) """ mysql连接实例(半成品) 待实现:; 1.配置文件配置连接属性 author:CallMeE date:2018-06-01 """ logger.inject_logger(globals()) namespace = [ '_host', '_user', '_password', '_database', '_port', '_unix_socket', '_charset', '_sql_mode', '_read_default_file', '_conv', '_use_unicode', '_client_flag', '_cursorclass', '_init_command', '_connect_timeout', '_ssl', '_read_default_group', '_compress', '_named_pipe', '_no_delay', '_autocommit', '_db', '_passwd', '_local_infile', '_max_allowed_packet', '_defer_connect', '_auth_plugin_map', '_read_timeout', '_write_timeout', '_bind_address', '_binary_prefix' ] class Connection(pymysql.connections.Connection): ''' 数据库连接参数默认为连接本地root账户 目前只支持配置参数以及默认值: _host = 'localhost' _user = 'root' _password = 'root' _database = None ''' _host = properties.pydc_host _user = properties.pydc_user _password = properties.pydc_password _database = properties.pydc_database _port = properties.pydc_port _charset = 'utf8' def __init__(self, **kwargs): __host = kwargs['host'] if 'host' in kwargs else (kwargs['__host'] if '__host' in kwargs else self._host) __port = kwargs['port'] if 'port' in kwargs else (kwargs['__port'] if '__port' in kwargs else self._port) __user = kwargs['user'] if 'user' in kwargs else (kwargs['__user'] if '__user' in kwargs else self._user) __password = kwargs['password'] if 'password' in kwargs else ( kwargs['__password'] if '__password' in kwargs else self._password) __database = kwargs['database'] if 'database' in kwargs else ( kwargs['__database'] if '__database' in kwargs else self._database) __charset = kwargs['charset'] if 'charset' in kwargs else ( kwargs['__charset'] if '__charset' in kwargs else self._charset) super(Connection, self).__init__(host=__host, port=__port, user=__user, password=__password, database=__database, charset=__charset) def get_connect(self): return self def test_conn(self): try: self.ping() return True except: return False def re_conn(self): self.__init__(self._host, self._port, self._user, self._pwd, self._db, charset='utf8') PK!??dophon_db/const/__init__.pyfrom dophon_db.const import regix_str REGIXSTR = regix_str PK!m{dophon_db/const/regix_str.py# coding:utf-8 get_mapper_file_name = '\\..*' get_at_least_one_blank = '\\s+' mapper_bound_field_check = '(\#|\$|\!|\@|\?)\{.*\}' select_sql_check = '^\\s*(s|S)(e|E)(l|L)(e|E)(c|C)(t|T)\\s+.+' PK!a/dophon_db/mysql/__init__.pyfrom dophon_db import properties from .core import * from .core import CurObj from . import cluster from dophon_db.mysql import PageHelper from dophon_db.mysql.remote.Cell import Cell from dophon_db.mysql import binlog from dophon_db.mysql.binlog import Schued from dophon_db.mysql.sql_util import * from dophon_logger import * logger = get_logger(DOPHON) ALL = -1 SINGLE = 0 CLUSTER = 1 # 默认优先模式是分片 priority_mode = properties.db_mode if isinstance(properties.db_mode, int) else eval(properties.db_mode) db_cluster = cluster.get_db from .single import getDbObj, getPgObj db_obj = None pg_obj = None def assert_single(): return (db_obj if priority_mode == SINGLE or priority_mode == ALL else True) \ and \ (pg_obj if priority_mode == SINGLE or priority_mode == ALL else True) def assert_cluster(): return db_cluster if priority_mode == CLUSTER or priority_mode == ALL else True def mode_assert(mode_type: str): def fun(f): def arg_bucket(*args, **kwargs): assert priority_mode == eval(mode_type) or priority_mode == ALL, f'模式异常,请切换至({mode_type})或(ALL)' return f(*args, **kwargs) return arg_bucket return fun if priority_mode == CLUSTER: cluster.init_cluster_manager() # assert assert_cluster(), f'未切换到使用多源模式,当前模式为(SINGLE)' if priority_mode == SINGLE: db_obj = mode_assert('SINGLE')(getDbObj) pg_obj = mode_assert('SINGLE')(getPgObj) # assert assert_single(), f'未切换到使用单源模式,当前模式为(CLUSTER)' if priority_mode == ALL: cluster.init_cluster_manager() db_obj = mode_assert('SINGLE')(getDbObj) pg_obj = mode_assert('SINGLE')(getPgObj) # assert assert_cluster(), f'未切换到使用多源模式,当前模式为(SINGLE)' # assert assert_single(), f'未切换到使用单源模式,当前模式为(CLUSTER)' __all__ = ['db_obj', 'pg_obj', 'db_cluster'] PK!"dophon_db/mysql/binlog/__init__.py# coding: utf-8 from dophon_db.mysql.binlog import ZipBinLog from dophon_logger import * logger = get_logger(DOPHON) """ xml文件增强功能封装单元 author:CallMeE date:2018-06-01 """ logger.inject_logger(globals()) class BinCache(): _bin = '' _file = '' _false_fun = None def __init__(self, file): # 初始化原始bin对象 self._bin = ZipBinLog.zip_as_bin(file) self._file = file def set_false_fun(self, fun): self._false_fun = fun def chk_diff(self): if self._bin == ZipBinLog.zip_as_bin(self._file): pass else: logger.info(f'文件发生增量更新({self._file})') # 执行增量更新方法 if not self._false_fun: pass else: self._false_fun() # 重新初始化对象binlog信息 self._bin = ZipBinLog.zip_as_bin(self._file) def def_false_fun(file): logger.info(f'xml文件发生增量改变!({file})') PK!զm dophon_db/mysql/binlog/Schued.py# coding: utf-8 from threading import Thread import schedule, time from dophon_logger import * logger = get_logger(DOPHON) _sched = schedule.Scheduler() """ 调度器 author:CallMeE date:2018-06-01 """ logger.inject_logger(globals()) class sech_obj: def __init__(self, fun, delay): self.__fun = fun self.__delay = delay def enter(self): global _scheds _sched.every(self.__delay).seconds.do(self.__fun) def run_target(self): return self.__fun def run(): while True: _sched.run_pending() time.sleep(1) def start_sech(): """ 启动xml更新 :return: """ logger.info('xml自动更新调度启动') Thread(target=run).start() PK!A0#dophon_db/mysql/binlog/ZipBinLog.py# coding: utf-8 import re, uuid, zlib from dophon_db.const import * """ binlog工具 author:CallMeE date:2018-06-01 """ def zip_as_bin(file): f = open(file, 'r', encoding='utf8') bstr = f.read() # 去除注释与空格,换行等 bstr = re.sub(REGIXSTR.get_at_least_one_blank, ' ', re.sub('', ' ', bstr)) # 压缩内容 z_s = zip_str(bstr) bs = [] for b in str.encode(bstr): bs.append(b) # 返回唯一标识值(文件存储用文件名原子的uuid,文件标识用压缩内容原子的uuid) return uuid.uuid3(uuid.NAMESPACE_DNS, bstr) # 暂时弃用 def bindigits(n, bits): s = bin(n & int("1" * bits, 2))[2:] return ("{0:0>%s}" % (bits)).format(s) def zip_str(string): # 压缩内容 z_result = zlib.compress(string.encode(encoding='utf8')) return z_result def un_zip_str(string): # 解压内容 un_z_result = zlib.decompress(string).decode(encoding='utf8') return un_z_result PK!ha dophon_db/mysql/cluster.pyfrom dophon_db import properties from .core import CurObj from dophon_db import Pool, Connection from dophon_db.mysql.remote.Cell import Cell import re from dophon_logger import * get_logger(DOPHON).inject_logger(globals()) obj_manager = {} # 定义项目路径 project_path = properties.project_root cluster_manager = {} def get_manager(): return cluster_manager def get_db(path, debug: bool = False, auto_fix: bool = False): """ :param path: :param debug: :param auto_fix: :return: """ x = lambda p: p.split('::') if isinstance(path, str): from . import mode_assert # 切分出别名和路径 cluster_path = mode_assert('CLUSTER')(x)(path) assert len(cluster_path) > 1, '无法解释的路径格式' if auto_fix: pattern = re.sub(r'\\', r'\\\\', re.sub('/', '\\/', project_path)) if not re.search(pattern, cluster_path[1]): r_path = str(project_path) + str(cluster_path[1]) else: r_path = str(cluster_path[1]) else: r_path = cluster_path[1] # 返回预设连接池所生成的数据库对象 alias_name = cluster_path[0] if alias_name in cluster_manager: return CurObj(r_path, True, debug, db=cluster_manager[alias_name]) elif alias_name == 'x': # 暂定别名,由结果集定义 undefined = CurObj(r_path, True, debug) db_alias = undefined.sql.alias_for assert db_alias, f'结果集别名不存在{db_alias}' setattr(undefined, '_pool', cluster_manager[db_alias]) return undefined else: raise KeyError(f'别名不存在{alias_name}') elif isinstance(path, Cell): logger.error('暂不支持远程映射集') else: raise TypeError('结果集路径类型错误') def init_cluster_manager(): if hasattr(properties, 'db_cluster'): clusters = properties.db_cluster # 初始化分片连接池 logger.info('初始化分片连接池') for cluster in clusters: pool = Pool.Pool() pool.init_pool(properties.pool_conn_num, Connection.Connection, conn_kwargs=cluster) # print(f'{cluster["alias"]}--{pool.cache_conn_kwargs}') # print(id(pool)) cluster_manager[cluster['alias']] = pool logger.info('分片连接池初始化完毕') PK!*C*]*]dophon_db/mysql/core.py# coding: utf-8 import dophon_db.reader as reader from dophon_db.mysql import PageHelper from dophon_db import Connection from dophon_db import Pool from dophon_db import utils import threading import re from dophon_db.mysql import binlog from dophon_db.mysql.binlog import Schued from dophon_db import properties from dophon_db.mysql.sql_util import * from dophon_db.const import * from dophon_logger import * from .sql_util import transfer_to_mapper_key logger = get_logger(DOPHON) """ 后续开发; (完成)1.远程读取mapperxml (完成)2.语句对象定时更新暂定为本地xml文件增量binlog更新,后续开发远程增量更新(流程未定) (完成)2.1本地binlog处理对比 (完成)2.2远程binlog处理对比 (半完成)3.binlog生成算法以及对比算法以及增量写入 (完成)4.细粒度事务控制 (完成)5.配置文件配置连接数 (完成)6.参照mybatis完成sql骨架拼写 单条语句执行demo: # obj = getDbObj(project_path + '/mappers/ShopGoodsMapper.xml') # setObjUpdateRound(obj, '2') # obj.exe_sql("findGoodsList") 批量语句执行DEMO: # obj=getDbObj(path=project_path +'/mysql/test.xml',debug=True) # obj.exe_sql_obj_queue(queue_obj={"test":(1,2),"test":(2,3)}) 或者 # obj.exe_sql_queue(method_queue=['test','test','test_s','test','test'],args_queue=[('1','2'),('2','3'),(),('3','4'),('3','4')]) """ # 定义项目路径 project_path = properties.project_root logger.inject_logger(globals()) class BlockingThreadError(Exception): pass class CurObj: _page = False _db = None _cursor = None _conn = None _debug = False _cursor = None sql = None lock = threading.Lock() def __init__(self, path: str, poolFlag: bool, debug: bool, db=None, proxy_getter: bool = True): """ 初始化结果集对象 :param db: 连接工具实例(连接池或单个连接) :param path: 结果集管理xml路径 :param poolFlag: 连接池标识 :param debug: 调试标识 :param proxy_getter: 代理到实例参数标识 """ if not db: db = Pool.get_single_pool() self._debug = debug self._poolFlag = poolFlag self.__proxy_getter = proxy_getter self.sql = reader.Mapper() if poolFlag: # 连接池实例化 self._pool = db else: self._db = db self.init_mapper(path) # global pool # pool = custom_pool self.init_chain_options() def init_chain_options(self): self.__chain_lock = threading.Lock() setattr(self, '__chain_action_flag', False) setattr(self, '__chain_result', {}) def start(self): """ 开始执行链式调用,初始化相关参数 :return: """ if self.__chain_lock.locked(): from threading import ThreadError raise ThreadError('链式调用未完结') setattr(self, '__chain_action_flag', True) setattr(self, '__chain_result', {}) self.__chain_lock.acquire() return self def end(self, result_obj: type = None): """ 结束链式调用,返回结果,还原参数 :return: """ setattr(self, '__chain_action_flag', False) result = getattr(self, '__chain_result').copy() setattr(self, '__chain_result', {}) if result_obj: result_o = result_obj() for k, v in result.items(): setattr(result_o, k, v) else: result_o = result self.__chain_lock.release() return result_o def init_mapper(self, path): self.sql.open_dom(path) self._path = path self._file_part_mark = transfer_to_mapper_key(path) # self._file_part_mark = re.sub(REGIXSTR.get_mapper_file_name, '', path.split('/')[len(path.split('/')) - 1]) self._sqls = self.sql.get_tree()[self._file_part_mark] self.proxy() if self.__proxy_getter else None def proxy(self): """ 代理到getattr实例参数,更贴合逻辑习惯 :return: """ for k in self._sqls.keys(): setattr(self, k, CallableObject(self.exe_sql, k, self)) # 弃用 # def get_tables(self): # """ # 获取自身关联数据库表数据 # :return: # """ # self._cursor.execute("""SELECT table_name FROM information_schema.TABLES""") # self._db.commit() # result = utils.sort_result(self._cursor.fetchall(), self._cursor.description, []) # self.__db_tables_list = result def refreash_sqls(self): """ 刷新sql语句(动态调试sql) ps:慎用 :return: """ self.init_mapper(self._path) def check_conn(self): """ 检查数据库连接 :return: """ try: if not self._db: # 无连接,需要获取连接 if self._poolFlag: # 连接池 self._conn = self._pool.get_conn() self._db = self._conn.get_connect() else: # 单个连接理论上只执行一次,过后直接关闭 self._db = Connection() except Exception as e: logger.error(e) raise Exception('检查连接失败') def set_cursor(self): """ # 初始化指针(如果不存在指针) :return: """ if not self._cursor: self._cursor = self._db.cursor() # self.get_tables() def get_cursor(self): return self._cursor def get_sql(self, methodName: str, pageInfo, args=()): """ # 获取sql语句(包含处理) :param methodName: 结果集代号 :param pageInfo: 分页标识 :param args: 结果集映射语句参数集 :return: """ # 单独连接实例化 # 判断是否存在子节点 if methodName not in self._sqls: logger.error('没有该方法!method: ' + str(methodName)) raise Exception('没有该方法!method:' + str(methodName)) _sql = self.sql.get_executable_sql(self._sqls, methodName, args) # 判断是否分页(总开关) if self._page: # 开启之后该实例所有语句都认为是 需要分页 # 慎用!!!! # 分页 _sql = _sql + 'limit ' + str(self._pageNum * self._pageSize) + ',' + str(self._pageSize) if pageInfo: # 分页 _sql = _sql + PageHelper.depkg_page_info(pageInfo) # 判断是否骨架拼接 if args: _sql = self.translate_sql_bond(_sql, args) # 去除注释与空格,换行等 __sql = re.sub(REGIXSTR.get_at_least_one_blank, ' ', re.sub('', ' ', _sql)) return __sql def translate_sql_bond(self, _sql: str, args): """ 转义sql结果集骨架 :param _sql: 结果集原始骨架 :param args: 骨架参数集合 :return: 转义后的sql语句 """ result_sql = _sql # 检查骨架实参传入类型,并作不同处理 if type(args) is type(()): if re.match(REGIXSTR.mapper_bound_field_check, _sql): logger.error('骨架与参数不匹配') raise Exception('骨架与参数不匹配') result_sql = _sql % args[:] elif type(args) is type({}): for key in args.keys(): reg_str = r'(\#|\$|\!|\@|\?)\{' + str(key) + '\}' if not re.search(reg_str, _sql): ''' 此处有几种情况: 1.语句骨架不存在该key的空位(多余参数) 2.骨架参数与骨架不对应(多余空位) ''' pass else: # 转义参数值,防止sql注入 e_value = escape(args[key]) # 转义骨架参数 result_sql = re.sub('\$\{' + str(key) + '\}', str(e_value), result_sql) # 转义字符参数 result_sql = re.sub('\#\{' + str(key) + '\}', '\'' + str(e_value) + '\'', result_sql) # 转义近似参数 # 左近似 result_sql = re.sub('\!\{' + str(key) + '\}', '\'%' + str(e_value) + '\'', result_sql) # 右近似 result_sql = re.sub('\@\{' + str(key) + '\}', '\'' + str(e_value) + '%\'', result_sql) # 全近似 result_sql = re.sub('\?\{' + str(key) + '\}', '\'%' + str(e_value) + '%\'', result_sql) # 多余空位检查 if re.search('(\#|\$|\!|\@|\?)\{' + str(key) + '\}', result_sql): logger.error('存在无法配对的骨架参数') raise Exception('存在无法配对的骨架参数') else: try: result_sql = _sql % args[:] except Exception as e: logger.error(e + '\n') raise e return result_sql def set_page(self, pageNum: str, pageSize: str): """ # 设定分页信息 :param pageNum:页号(从0开始) :param pageSize: 页容(>0) :return: """ self._pageNum = pageNum self._pageSize = pageSize self._page = True return self def initial_page(self): """ 重置结果集实例的分页信息 :return: """ self._page = False return self def exe_sql_obj_queue(self, queue_obj={}) -> dict: # 批量执行语句(整体版) """ queue_obj中key为方法名,value为参数 注意!!!! 对于一个业务来说,一个sql方法只使用一次(因为有内部数据缓存) 若其中有重复方法,建议用分割版 """ if queue_obj: methods = list(queue_obj.keys()) args = list(queue_obj.values()) return self.exe_sql_queue(method_queue=methods, args_queue=args) else: logger.error('queue_obj参数不正确') raise Exception('queue_obj参数不正确') def exe_sql_queue(self, method_queue=[], args_queue=[]) -> dict: # 批量执行语句(拆分版) """ method_queue中存放顺序执行的sql方法名[str] args_queue中存放对应下标方法的参数元组[()] 若其中包含select无条件参数语句,请用空元组()占位 """ self.lock.acquire(blocking=True) result = {} # 参数检查 if not method_queue: logger.error('语句方法为空') raise Exception('语句方法为空') return if not args_queue: logger.error('语句参数列表为空') raise Exception('语句参数列表为空') return self.check_conn() self.set_cursor() try: # 开启事务 # 批量取语句(以方法名为准,多于参数队列元素将丢弃) while method_queue: method = method_queue.pop(0) args = args_queue.pop(0) """ 对于增改查来说,并不需要分页,参数列表是必须的 """ _sql = self.get_sql(methodName=method, args=args, pageInfo=None) # 执行sql语句 exc_result = self._cursor.execute(_sql) ''' 尝试执行语句成功后会解析结果集 ''' if re.match(REGIXSTR.select_sql_check, _sql): data = self._cursor.fetchall() description = self._cursor.description else: data = [[self._cursor.rowcount]] description = [['row_count']] current_result = utils.sort_result(data, description, []) # 调试模式打印语句 if self._debug: print_debug(methodName=method, args=args, sql=_sql, result=self._cursor.rowcount) # 事务提交(pymysql要求除查询外所有语句必须手动提交) if method in result: result[method + str(len(result))] = current_result else: result[method] = current_result except Exception as e: logger.error('SQL错误: %s , 语句为: %s' % (str(e), _sql)) self._db.rollback() logger.error('事务回滚' + str(method_queue)) raise e else: self._db.commit() logger.info('事务提交' + str(method_queue)) finally: self.lock.release() # 关闭连接 self.close() return result def exe_sql(self, methodName='', pageInfo=None, args=()) -> list: """ # 执行单条语句,返回结果列表(select more) # 防报错参数设定默认值 :param methodName: 结果集映射代号 :param pageInfo: 结果集分页信息 :param args: 结果集查询参数 :return: ==> 查询结果 ==> 有效行数 """ result = self.exe_sql(methodName=methodName, pageInfo=pageInfo, args=args) if isinstance(result, list): # 列表类型执行取值 if 1 < len(result): # 多个结果 logger.error('过多结果') raise Exception('过多结果') elif 0 == len(result): return None else: return result[0] else: return result def close(self): """ (单个连接):关闭数据库连接 (连接池):归还连接 :return: """ # print(id(self._pool)) self._cursor.close() if self._poolFlag: # 为连接池定义 self._pool.close_conn(self._conn) # 归还连接后清除指针 self._cursor = None self._db = None self._conn = None else: # 为单独连接定义 self._db.close() def insert_to_update_dispacther(self, millionSecond): """ # 定义插入更新调度方法 :param millionSecond: 更新频率 :return: """ if isinstance(millionSecond, int): w_time = millionSecond pass else: try: w_time = int(millionSecond) except Exception as e: logger.error(e) # 此处为增量更新代码 ''' 临时思路 1.设定定时间隔 2.传入当前语句对象 3.内部压缩保存binlog 4.定时完毕重新获取语句,获取新语句对象binlog 5.对比binlog 5.1若更新后binog无差异则不作处理 5.2若存在差异,替换语句对象 ''' self._bin_cache = binlog.BinCache(self._path) # 添加变更处理 self._bin_cache.set_false_fun(self.refreash_sqls) # 调度器添加任务 Schued.sech_obj(fun=self._bin_cache.chk_diff, delay=w_time).enter() """ 细粒度sql语句执行组 """ def execute(self, method_name: str, pageInfo=None, args=()): if self.lock.locked(): raise BlockingThreadError('还有事务尚未提交!!!') self.lock.acquire(blocking=True) self._cursor.execute(sql) def commit(self): self._db.commit() self.lock.release() def rollback(self): self._db.rollback() self.lock.release() def c_prop(clz: type, prop_name: str, prop_value=None, use_setter: bool = True, use_getter: bool = True): """ 生成类内属性方法 :return: """ in_setter = None in_getter = None setattr( clz, '_' + prop_name, prop_value, ) if use_setter: in_setter = c_prop_setter(prop_name) if use_getter: in_getter = c_prop_getter(prop_name) setattr( clz, prop_name, property(in_getter, in_setter) ) return clz def c_prop_setter(prop_name: str) -> classmethod: """ 生成setter方法 :return: """ setter_template = compile( 'def setter_' + prop_name + '(self,value):' + '\n\tself._' + prop_name + ' = value' + '', 'exec' ) setter_function_code = [c for c in setter_template.co_consts if isinstance(c, types.CodeType)][0] setter_method = types.FunctionType(setter_function_code, {}) return setter_method def c_prop_getter(prop_name: str) -> classmethod: """ 生成getter方法 :return: """ getter_template = compile( 'def getter_' + prop_name + '(self):' + '\n\treturn self._' + prop_name, '', 'exec' ) getter_function_code = [c for c in getter_template.co_consts if isinstance(c, types.CodeType)][0] getter_method = types.FunctionType(getter_function_code, {}) return getter_method def set_update_round(obj: CurObj, second: int): """ 设置结果集映射实例定时更新 :param obj: 结果集映射实例 :param second: 更新频率 :return: """ if isinstance(obj, CurObj): obj.insert_to_update_dispacther(second) else: logger.error('类型错误!!!!') raise Exception('类型错误!!!!') setObjUpdateRound = set_update_round def print_debug(methodName: str, sql: str, args: dict, result: list): """ # 调试模式下的语句信息打印 :param methodName: 结果集映射实例的结果集代号 :param sql: 结果集生成语句 :param args: 结果集实例执行参数 :param result: 结果集实例执行结果 :return: """ print('METHOD:==>' + methodName) print('SQL:=====>' + sql) print('PARAMS:==>' + str(args)) if isinstance(result, list): if result and result[0]: # 拿出列名 print('ROWS:====>' + str(list(result[0].keys()))) print('RESULT:==>' + str(list(result[0].values()))) for r in result[1:]: print('=========>' + str(list(r.values()))) else: print('ROWS:====>None') print('RESULT:==>None') if isinstance(result, int): print('ROWS:====>', result) def whereCause(args: dict) -> str: """ 将字典转换为sql条件语句 :param args: 字典(条件字典) :return: sql条件语句 """ cache = [] for key in args.keys(): cache.append(str(key) + '=' + str(args[key])) return 'WHERE ' + re.sub('\[|\]|\\\"|\\\'', '', re.sub(',', ' AND ', str(cache))) update_round = setObjUpdateRound where = whereCause __all__ = [ 'update_round', 'where', 'CastableObject' ] # 检测是否开启sql热更新模式 if properties.pydc_xmlupdate_sech: Schued.start_sech() class CallableObject: """ 可执行载体类 """ def __init__(self, callable_entity, execution_body: str, action_target: CurObj = None): assert callable(callable_entity), 'unknown entity' self.__callable = callable_entity self.__execution = execution_body self.__action_entity = action_target def __call__(self, *args, **kwargs): if getattr(self.__action_entity, '__chain_action_flag'): action_result = getattr(self.__action_entity, '__chain_result') if self.__execution in action_result: pre_action_result = action_result[self.__execution] \ if isinstance(action_result[self.__execution], dict) \ else {'0': action_result[self.__execution]} pre_action_result[str(len(pre_action_result))] = \ self.__callable(methodName=self.__execution, pageInfo=None, args=kwargs) action_result[self.__execution] = pre_action_result else: # 暂定为exe_sql方法体 action_result[self.__execution] = self.__callable(methodName=self.__execution, pageInfo=None, args=kwargs) setattr(self.__action_entity, '__chain_result', action_result) return self.__action_entity else: return self.__callable(methodName=self.__execution, pageInfo=None, args=kwargs) PK!oh dophon_db/mysql/PageHelper.py# coding: utf-8 import re from dophon_logger import * PAGE_NUM_REG_STR = '.*[nN][uU][mM].*' PAGE_SIZE_REG_STR = '.*[nN][uU][mM].*' logger = get_logger(DOPHON) """ sql分页工具(自带正则寻值) (不区分大小写) *num* ----->页码 *size* ------>页容 待实现: 1.参照mybatis的pagehelper author:CallMeE date:2018-06-01 """ logger.inject_logger(globals()) # 打包分页信息 def pkg_page_info(page_num=1, page_size=1, page_model=[]): if not page_model: return {'page_num': locals()['page_num'], 'page_size': locals()['page_size']} else: page_info = {} # 选取对应key写入数据 for key in page_model: if re.match(PAGE_NUM_REG_STR, string=key): page_info[key] = page_num if re.match(PAGE_SIZE_REG_STR, string=key): page_info[key] = page_size return page_info # 解包分页信息 def depkg_page_info(page_info: dict): for key in page_info: if re.match(PAGE_NUM_REG_STR, string=key): _page_num = page_info[key] if re.match(PAGE_SIZE_REG_STR, string=key): _page_size = page_info[key] return 'limit ' + str( 0 if int(_page_num) * int(_page_size) < 0 else int(_page_num) * int(_page_size)) + ',' + str( _page_size) def fix_page_info(page_info: dict): """ 修正分页信息 :param page_info: :return: """ for key in page_info: if re.match(PAGE_NUM_REG_STR, string=key): _page_num = page_info[key] if re.match(PAGE_SIZE_REG_STR, string=key): _page_size = page_info[key] return {'page_num': _page_num, 'page_size': _page_size} # 分页装饰器 def pkg_pageobj(f): """ 封装结果集为分页信息结果集 -> Pages :param f: :return: """ # print(f) def f_args(*args, **kwargs): # 执行查询 result = f(*args, **kwargs) # 结果校验 if isinstance(result, list): # 分页信息检查 if 'pageInfo' in kwargs and kwargs['pageInfo']: page_obj = fix_page_info(kwargs['pageInfo']) page_obj['list'] = result return page_obj else: logger.error('不存在分页信息') return result else: logger.error('结果不符合分页策略') return f_args PK!"dophon_db/mysql/remote/__init__.py# coding: utf-8 from dophon_db.mysql.remote import Cell """ 远程xml模块工厂 author:CallMeE date:2018-06-01 # 开启远程xml推荐导入本模块获取实例 # import mysql.remote as remote """ # 工厂模式获取远程细胞实例 def get_cell(file_name, remote_path, read_only=False): return Cell.get_cell(file_name=file_name, remote_path=remote_path, read_only=read_only)PK!H9dophon_db/mysql/remote/Cell.py# coding: utf-8 from urllib import request import uuid, os, time, stat from dophon_db import mysql from dophon_db.mysql.binlog import Schued as schued from dophon_logger import * logger = get_logger(DOPHON) """ 远程xml映射实例模板 author:CallMeE date:2018-06-01 # 开启远程xml需要导入以下模块 # import mysql.remote as remote demo: remote_cell = remote.get_cell('ShopGoodsMapper.xml', remote_path='http://127.0.0.1:8400/member/export/xml/ShopGoodsMapper.xml') obj1 = getDbObj(remote_cell.getPath(), debug=True) 或者 obj1 = getDbObj(remote_cell, debug=True) 注意!!! read_only设置远程xml文件是否为只读,注意防止与自动增量更新冲突 """ logger.inject_logger(globals()) class Cell(): def __init__(self, file_name='', remote_path='', read_only=False): self._file_name = file_name self._remote_path = remote_path self._uid = uuid.uuid5(uuid.NAMESPACE_DNS, file_name) # 创建临时文件夹(安全性考虑,采用多层目录) self._file_path = mysql.project_path + sort_path(self._uid) # 检查路径 if os.path.exists(self._file_path): pass else: os.makedirs(self._file_path) while True: # 下载远程文件 try: response = request.urlretrieve(url=remote_path, filename=self._file_path + '/' + self._file_name) logger.info('加载远程mapper:' + response[0]) # 放置路径 self._abs_path = response[0] if read_only: self.lock_to_read() break except Exception as e: logger.error(e) logger.error('连接远程计算机失败,请检查连接,3秒后重试(' + str(id(self)) + ')') time.sleep(3) # 重新加载文件 def reload_file(self): # 放开写入权限 if self.is_only_read(): self.unlock_to_read() response = request.urlretrieve(url=self._remote_path, filename=self._file_path + '/' + self._file_name) self.lock_to_read() else: response = request.urlretrieve(url=self._remote_path, filename=self._file_path + '/' + self._file_name) logger.info('加载远程mapper:' + response[0]) # 放置路径 self._abs_path = response[0] # 链式调用(非必需) return self # 进入调度定时更新文件 def reload_file_round(self, minute): schued.sech_obj(self.reload_file, minute * 60).enter() # 链式调用(非必需) return self # 获取下载文件路径 def getPath(self): return self._abs_path # 另一种方式获取路径 def __str__(self): return self.getPath() # 锁定文件为只读 def lock_to_read(self): os.chmod(self._abs_path, stat.S_IREAD) # 解除文件只读 def unlock_to_read(self): os.chmod(self._abs_path, stat.S_IWRITE) # 判断文件是否只读 def is_only_read(self): try: with open(self._abs_path, "r+") as fr: return False except IOError as e: if "[Errno 13] Permission denied" in str(e): return True else: logger.error(str(e)) return False def sort_path(path_str): result = '/.mapper' for s in str(path_str): result = result + '/' + s return result # 工厂模式获取实例 # def get_cell(file_name, remote_path, read_only): # return Cell(file_name=file_name, remote_path=remote_path, read_only=read_only) PK!Epdophon_db/mysql/single.pyfrom dophon_db import properties from .core import CurObj from dophon_db.mysql import PageHelper from dophon_db.mysql.remote.Cell import Cell import re from dophon_db.const import * from dophon_logger import * logger = get_logger(DOPHON) obj_manager = {} # 定义项目路径 project_path = properties.project_root class PageObj(CurObj): def pageable_exe_sql(self, methodName: str = '', pageInfo: dict = None, args=()) -> dict: result_list = self.exe_sql(methodName=methodName, pageInfo=pageInfo, args=args) if pageInfo: # 获取无分页语句 un_page_sql = self.get_sql(methodName=methodName, pageInfo=None, args=args) if re.match(REGIXSTR.select_sql_check, un_page_sql): # sql语句判断(非查询不作分页信息处理) un_page_sql = re.sub('^\\s*(s|S)(e|E)(l|L)(e|E)(c|C)(t|T)\\s+.+(f|F)(r|R)(o|O)(m|M)', 'SELECT COUNT(*) FROM', un_page_sql) conn = self._pool.getConn() connect = conn.getConnect() cursor = connect.cursor() cursor.execute(un_page_sql) count_items = cursor.fetchall()[0][0] connect.commit() self._pool.closeConn(conn) result = PageHelper.fix_page_info(pageInfo) import math result['total_page'] = math.ceil(count_items / result['page_size']) result['list'] = result_list return result return result_list def get_db_obj(path, debug: bool = False, auto_fix: bool = False, proxy_getter: bool = True): """ 获取数据表实例 :param path: xml文件路径 :param debug: 是否开启调试模式 :param auto_fix: 是否开启路径修复模式(损耗资源) <=== 待测试 :return: xml对应实例 """ # if not pool: # logger.error('连接池未定义') # raise Exception('连接池未定义') # if 0 >= pool.size(): # 配置属性生命周期过短,拟用__import__导入减轻内存废址 # prop = __import__('properties') # if hasattr(prop, 'pool_conn_num'): # pool.initPool(getattr(prop, 'pool_conn_num'), Connection.Connection) # else: # 初始5个连接 # pool.initPool(5, Connection.Connection) if isinstance(path, str): if auto_fix: pattern = re.sub(r'\\', r'\\\\', re.sub('/', '\\/', project_path)) if not re.search(pattern, path): r_path = str(project_path) + str(path) else: r_path = str(path) else: r_path = path elif isinstance(path, Cell): r_path = path.getPath() else: raise TypeError('结果集路径类型错误') # 数据语句对象改为单例模式获取 if r_path in obj_manager: return obj_manager[r_path] singleton_obj = CurObj(r_path, True, debug, proxy_getter=proxy_getter) obj_manager[r_path] = singleton_obj return singleton_obj def get_pg_obj(path, debug: bool = False, auto_fix: bool = False, proxy_getter: bool = True): """ 获取数据表实例 :param path: xml文件路径 :param debug: 是否开启调试模式 :param auto_fix: 是否开启路径修复模式(损耗资源) <=== 待测试 :return: xml对应实例 """ # if pool is None: # logger.error('连接池未定义') # raise Exception('连接池未定义') # if 0 >= pool.size(): # 配置属性生命周期过短,拟用__import__导入减轻内存废址 # prop = __import__('properties') # if hasattr(prop, 'pool_conn_num'): # pool.initPool(getattr(prop, 'pool_conn_num'), Connection.Connection) # else: # 初始5个连接 # pool.initPool(5, Connection.Connection) if isinstance(path, str): if auto_fix: pattern = re.sub(r'\\', r'\\\\', re.sub('/', '\\/', project_path)) if not re.search(pattern, path): r_path = str(project_path) + str(path) else: r_path = str(path) else: r_path = path elif isinstance(path, Cell): r_path = path.getPath() else: raise TypeError('结果集路径类型错误') # 数据语句对象改为单例模式获取 if r_path in obj_manager: return obj_manager[r_path] singleton_obj = PageObj(r_path, True, debug, proxy_getter=proxy_getter) obj_manager[r_path] = singleton_obj return singleton_obj getDbObj = get_db_obj getPgObj = get_pg_obj PK!'۰ $dophon_db/mysql/sql_util/__init__.pyimport types import re import os from pymysql import escape_dict, escape_sequence, escape_string escape_charset = 'utf-8' def escape(val): """ 转义值,防sql注入 :param val: :return: """ if isinstance(val, dict): return escape_dict(val, charset=escape_charset) elif isinstance(val, (list, set,)): return escape_sequence(val, charset=escape_charset) else: return escape_string(val) class CastableObject: """ 转换对象公共类 """ @classmethod def parse(clz, exe_result): return into_obj(exe_result=exe_result, clz=clz) def create_cast_obj(kwargs: dict, clz_name: str): """ 创造类 :param kwargs: :param clz_name: :return: """ class_obj = type(clz_name, (CastableObject,), {}) for k, v in kwargs.items(): setter_code = compile( 'def setter_' + k + '(self,value):' + '\n\tself._' + k + ' = value', '', 'exec' ) setter_function_code = [c for c in setter_code.co_consts if isinstance(c, types.CodeType)][0] setter_method = types.FunctionType(setter_function_code, {}) getter_code = compile( 'def getter_' + k + '(self):' + '\n\treturn self._' + k, '', 'exec' ) getter_function_code = [c for c in getter_code.co_consts if isinstance(c, types.CodeType)][0] getter_method = types.FunctionType(getter_function_code, {}) setattr( class_obj, '_' + k, v, ) setattr( class_obj, k, property(getter_method, setter_method) ) return class_obj() def into_obj(exe_result, clz: CastableObject): """ 结果集转换对象 :param exe_result: 结果集 :param clz: 转换对象类 :return: """ if isinstance(exe_result, (list,)): return [into_obj_single(item, clz) for item in exe_result] elif isinstance(exe_result, (dict,)): return into_obj_single(exe_result, clz) else: raise Exception('实例转换异常') def into_obj_single(exe_result_item: dict, clz: CastableObject): """ 结果集转换对象:单个 :param exe_result_item: 结果集 :param clz: 转换对象类 :return: """ result_obj = create_cast_obj(exe_result_item, getattr(clz, '__name__')) return result_obj def transfer_to_mapper_key(path: str): return re.sub('(\\..*)|:', '', '_'.join(path.split(os.sep))) PK!Vdophon_db/orm/__init__.pyfrom dophon_db.orm.manager_init import * from dophon_db import properties from dophon_logger import * logger = get_logger(DOPHON) logger.inject_logger(globals()) def init_orm(table_list=[], conn_kwargs={}): """ 初始化orm :return: """ if getattr(properties, 'db_cluster', []): logger.info('分片数据库初始化') return ClusterManager() else: return init_orm_manager(table_list, conn_kwargs) PK!%$.$. dophon_db/orm/db_obj/__init__.pyfrom dophon_db import Connection from dophon_db import Pool from dophon_db import utils import types from dophon_db.orm.db_obj.type_dict import db_type_python_dict from dophon_db.orm.db_obj.type_dict import set_check from dophon_db.orm.db_obj.function_class import * from dophon_db.orm.query_structor import Struct from dophon_logger import * logger = get_logger(DOPHON) logger.inject_logger(globals()) # 初始化表结构缓存 table_cache = {} def create_class(table_name: str, table_args: list): """ 创建数据表类 :param table_name: 表名 :param table_args: 表参数 :return: """ class_obj = type(table_name, ( SetAble, JoinAble, ValueAble, Struct, Parseable, Flushable, Pageable), {'__alias': table_name, 'table_map_key': table_name}) default_arg_list = [] for table_arg in table_args: # 获取表字段名以及属性 table_arg_field = table_arg['Field'] table_arg_type = table_arg['Type'] table_arg_null = table_arg['Null'] table_arg_key = table_arg['Key'] table_arg_default = table_arg['Default'] ''' 映射类属性方法组装 ''' setter_code = compile( 'def setter_' + table_arg_field + '(self,value):' + '\n\tself._' + table_arg_field + ' = value' + '\n\tself.append(\'' + table_arg_field + '\')', '', 'exec' ) setter_function_code = [c for c in setter_code.co_consts if isinstance(c, types.CodeType)][0] setter_method = set_check(table_arg_type)(types.FunctionType(setter_function_code, {})) getter_code = compile( 'def getter_' + table_arg_field + '(self):' + '\n\treturn self._' + table_arg_field, '', 'exec' ) getter_function_code = [c for c in getter_code.co_consts if isinstance(c, types.CodeType)][0] getter_method = types.FunctionType(getter_function_code, {}) setattr( class_obj, '_' + table_arg_field, table_arg_default if table_arg_null == 'YES' else None, ) setattr( class_obj, table_arg_field, property(getter_method, setter_method) ) default_arg_list.append(table_arg_field) # 设定默认字段列表(所有字段) setattr(class_obj, '__default_arg_list', default_arg_list) ''' 映射类固定方法组装 ''' # 重载直接调用运算符 callable_code = compile( 'def __call__(self,call_list):' + '\n\treturn self.get_fields(call_list)', '', 'exec' ) callable_function_code = [c for c in callable_code.co_consts if isinstance(c, types.CodeType)][0] callable_method = types.FunctionType(callable_function_code, {}) setattr( class_obj, '__call__', callable_method ) # 重载映射类别名运算符 alias_code = compile( 'def alias(self,alias_name:str):' + '\n\tself.__alias=alias_name' + '\n\treturn self', '', 'exec' ) alias_function_code = [c for c in alias_code.co_consts if isinstance(c, types.CodeType)][0] alias_method = types.FunctionType(alias_function_code, {}) setattr( class_obj, 'alias', alias_method ) # 重载初始化方法 def init_method(self, init_param=None): super(class_obj, self).__init__() self.read_from_dict(init_param) \ if isinstance(init_param, dict) else self.copy_from_obj(init_param) \ if isinstance(init_param, class_obj) else self setattr( class_obj, '__init__', init_method ) return class_obj class OrmManager: __table_cache = {} def __getattribute__(self, item): try: result = super(OrmManager, self).__getattribute__(item) except Exception as e: result = None # print(f'{item}==={result}==={type(result)}') if result is None: dynamic_init_tables(self, [item]) return eval(f'self.{item}') return result def add_orm_obj(self, table_obj: object): if 'table_name' in table_obj: # 添加表名单位 table_name = table_obj['table_name'] table_alias = table_obj['table_alias'] if table_obj['table_alias'] else table_obj['table_name'] # 编译表名属性方法(property) getter_module_code = compile( 'def ' + table_obj['table_name'] + '(self):\n\treturn self._' + table_obj['table_name'], '', 'exec' ) function_code = [c for c in getter_module_code.co_consts if isinstance(c, types.CodeType)][0] getter_method = types.FunctionType(function_code, {}) # 编译获取管理器连接池方法(property) pool_getter_module_code = compile( 'def pool_getter(self):\n\treturn self.connection_pool', '', 'exec' ) pool_getter_function_code = [c for c in pool_getter_module_code.co_consts if isinstance(c, types.CodeType)][ 0] pool_getter_method = types.FunctionType(pool_getter_function_code, {}) # 获取表结构 table_arg = table_obj['table_obj'] if not search_class_by_name(table_alias): # 组装新类 table_class = create_class(table_name, table_arg) save_cache(table_alias, table_class) else: table_class = get_cache(table_alias) # 植入类内 setattr(OrmManager, '_' + table_name, table_class) setattr(OrmManager, table_name, property(getter_method)) # 植入类内 setattr(table_class, 'pool_getter', pool_getter_method) setattr(table_class, 'connection_pool', self.connection_pool) self.__table_cache[table_obj['table_name']] = True else: logger.error('插入对象异常') raise Exception('插入对象异常') def has_table(self, table_name: str): return table_name in self.__table_cache def init_pool_in_manager(manager, conn_kwargs): """ 初始化对象管理器连接池 :param manager: :param conn_kwargs: :return: """ # 组装连接池属相相关 setattr(manager, '_connection_pool', Pool.get_pool(conn_kwargs)) # 编译表名属性方法(property) getter_module_code = compile( 'def connection_pool(self):\n\treturn self._connection_pool', '', 'exec' ) function_code = [c for c in getter_module_code.co_consts if isinstance(c, types.CodeType)][0] getter_method = types.FunctionType(function_code, {}) # 植入类内 setattr(OrmManager, 'connection_pool', property(getter_method)) def init_tables_in_db(manager: OrmManager, tables: list = [], conn_kwargs: dict = {}): logger.info('数据库全表ORM初始化开始' if not tables else '数据表' + str(tables[:]) + 'ORM初始化开始') connect = Connection.Connection(**conn_kwargs).get_connect() cursor = connect.cursor() cursor.execute('SHOW TABLES') connect.commit() # 整理数据表名列表 for tup_item in cursor.fetchall(): tup_item_name = tup_item[0] if '__host' in conn_kwargs and '__port' in conn_kwargs and '__database' in conn_kwargs: tup_item_alias = '-'.join( [conn_kwargs.get('__host'), str(conn_kwargs.get('__port')), conn_kwargs.get('__database'), tup_item[0]]) else: tup_item_alias = tup_item_name if tables: if tup_item_name in tables: init_table_param(tup_item_name, manager, table_alias=tup_item_alias, conn_kwargs=conn_kwargs) else: init_table_param(tup_item_name, manager, table_alias=tup_item_alias, conn_kwargs=conn_kwargs) connect.close() logger.info('数据库ORM初始化完毕') def dynamic_init_tables(manager: OrmManager, tables: list = [], conn_kwargs: dict = {}): """ 动态初始化表 :param manager: :param tables: :param conn_kwargs: :return: """ for table in tables: logger.info(f'{manager}动态初始化数据表:({tables})') conn_dict = conn_kwargs if conn_kwargs else manager.__dict__["_connection_pool"].cache_conn_kwargs connect = Connection.Connection(**conn_dict).get_connect() cursor = connect.cursor() # cursor.execute('SHOW TABLES') cursor.execute( f"SELECT * FROM information_schema.TABLES WHERE TABLE_SCHEMA=(SELECT database() AS db) AND TABLE_NAME='{table}';") connect.commit() connect.close() __table_exist = cursor.fetchall() # print(__table_exist) if __table_exist: # 整理数据表名列表 for tup_item in __table_exist: # 校验是否存在表名 tup_item_name = table # 执行数据表初始化 if '__host' in conn_kwargs and '__port' in conn_kwargs and '__database' in conn_kwargs: tup_item_alias = '-'.join( [conn_kwargs.get('__host'), str(conn_kwargs.get('__port')), conn_kwargs.get('__database'), tup_item[0]]) else: tup_item_alias = tup_item_name if tables: if tup_item_name in tables: init_table_param(tup_item_name, manager, table_alias=tup_item_alias, conn_kwargs=conn_kwargs) else: init_table_param(tup_item_name, manager, table_alias=tup_item_alias, conn_kwargs=conn_kwargs) else: raise KeyError(f'{manager}无法实例化的表名:{table}') def init_table_param(table_name, manager: OrmManager, table_alias: str = '', conn_kwargs: dict = {}): conn_dict = conn_kwargs if conn_kwargs else manager.__dict__["_connection_pool"].cache_conn_kwargs connect = Connection.Connection(**conn_dict).get_connect() cursor = connect.cursor() cursor.execute('DESC ' + table_name) connect.commit() titles = cursor.description values = cursor.fetchall() result = utils.sort_result(values, titles, []) table_obj = { 'table_alias': table_alias, 'table_name': table_name, 'table_obj': result } manager.add_orm_obj(table_obj) connect.close() def save_cache(table_name: str, table_class: object): """ 将orm映射类写入缓存,减少重复创建类 :param table_class: orm映射类 :return: """ logger.info('保存映射缓存: %s %s %s', table_name, ' => ', str(table_class)) table_cache[table_name] = table_class def get_cache(table_name: str) -> object: """ 根据表名获取orm映射类缓存 :param table_name: 映射表名 :return: orm映射类 """ logger.info('获取映射缓存: %s', table_name) return table_cache[table_name] def search_class_by_name(table_name: str) -> bool: """ 根据表名查找缓存 :param table_name: 映射表名 :return: 是否命中缓存 """ logger.info('检查映射缓存: %s', table_name) return table_name in table_cache PK!3G 5/5//dophon_db/orm/db_obj/function_class/__init__.pyimport re """ 功能特性类集合 """ __all__ = ['WhereAble', 'ValueAble', 'SetAble', 'OrmObj', 'JoinAble', 'Parseable', 'Flushable', 'Pageable'] class OrmObj(object): """ 表映射基础类(标注作用) 待添加:sql关键字保护策略算法 """ pass class FieldsCallable(OrmObj): """ 可取出内置数据集合的功能类 """ def __init__(self): """ 初始化类功能实现的数据域 """ if not hasattr(self, '__field_callable_list'): setattr(self, '__field_callable_list', []) self.__field_callable_list = [] def f_c_l_flush(self): """ 清洗内置数据集合 :return: """ setattr(self, '__field_callable_list', []) setattr(self, '_FieldsCallable__field_callable_list', []) def append(self, field_name: str): """ 内部方法:记录字段名 :param field_name: :return: """ f_list = getattr(self, '__field_callable_list') f_list.append(field_name) setattr(self, '__field_callable_list', f_list) self.__field_callable_list.append(field_name) def get_fields(self, f_list: list = []) -> dict: """ 获取字段映射 :param list: :return: """ cache = {} if hasattr(self, '__field_callable_list') and len(getattr(self, '__field_callable_list')) > 0: fs_name = getattr(self, '__field_callable_list') elif self.__field_callable_list: fs_name = self.__field_callable_list elif hasattr(self, '__default_arg_list'): fs_name = getattr(self, '__default_arg_list') else: fs_name = f_list for f_name in fs_name: if hasattr(self, f_name): if f_list: if f_name in f_list: cache[f_name] = getattr(self, f_name) continue else: cache[f_name] = getattr(self, f_name) else: print('警告:表(', getattr(self, 'table_map_key'), ')缺失字段(', f_name, '),表映射存在风险') return cache def get_field_list(self, f_list: list = []) -> list: """ 获取字段列表 :param list: :return: """ cache_list = list(self.get_fields(f_list=f_list).keys()) cache = [] for f_name in cache_list: cache.append( getattr(self, '__alias') + '.' + f_name ) return cache def fields(self, fields: list = []): fields_list = self.get_field_list(fields) cache = re.sub('\[|\]|\\\'|\\\"', '', str(fields_list)) return cache class ValueAble(FieldsCallable): """ 可赋值化功能类 """ def __init__(self): """ 初始化类功能实现的数据域 """ FieldsCallable.__init__(self) def value_cause(self, args: dict) -> list: """ 将字典键值分离 :param args: 字典对象 :return: """ keys = re.sub('\\\'', '', re.sub('\[', '(', re.sub('\]', ')', str(list(args.keys()))))) values = re.sub('\[', '(', re.sub('\]', ')', str(list(args.values())))) return [ keys , values ] def values(self, fields: list = []) -> str: """ 获取条件赋值语句 :param fields: 键值列表 :return: """ result = self.value_cause(self(fields)) return result[0] + ' VALUES ' + result[1] class WhereAble(FieldsCallable): """ 可条件化功能类 """ def __init__(self): """ 初始化类功能实现的数据域 """ FieldsCallable.__init__(self) def where_cause(self, args: dict, be_alias: bool = True) -> str: """ 将字典转换为sql条件语句 :param args: 字典(条件字典) :return: sql条件语句 """ cache = [] for key in args.keys(): self_table_alias = \ (getattr(self, '__alias') + '.' if be_alias and getattr(self, '__alias') != getattr(self, 'table_map_key') else '') cache.append( self_table_alias + str(key) + '=' + (str( args[key]) if isinstance(args[key], int) or isinstance(args[key], float) else ( '{' + str(args[key]) + '}'))) return re.sub('\{|\}', '\'', re.sub('\[|\]|\\\"|\\\'', '', re.sub(',', ' AND ', str(cache)))) def where(self, fields: list = [], be_alias: bool = True) -> str: """ 获取条件执行语句 :param fields:条件列表 :return: """ if len(fields) > 0 or ( hasattr(self, '__field_callable_list') and len(getattr(self, '__field_callable_list')) > 0): args = getattr(self, 'get_fields')(fields) else: return '' return ' WHERE ' + getattr(self, 'where_cause')(args, be_alias) class SetAble(WhereAble): """ 可更新化条件类 """ def __init__(self): """ 初始化类功能实现的数据域 """ FieldsCallable.__init__(self) def set(self, fields: list = []) -> str: """ 获取更新执行语句 :param fields: 更新参数列表 :return: """ args = self.get_fields(fields) return ' SET ' + re.sub('AND', ',', self.where_cause(args)) class JoinAble(OrmObj): """ 可关联化功能类 """ def __init__(self): """ 初始化关联列表 """ if not hasattr(self, '__join_list'): setattr(self, '__join_list', []) self.__join_list = [] def left_join(self, target, on_left_field: list, on_right_field: list): """ 左关联功能 :param target: 关联实例 :return: 自身实例 """ if not on_left_field or not on_right_field or len(on_left_field) > len(on_right_field): raise Exception('关联参数异常') if isinstance(target, JoinAble): setattr(self, '__join_list', []) getattr(self, '__join_list').append({ 'target': target, 'left_field': on_left_field, 'right_field': on_right_field }) return self else: raise Exception('关联对象不支持!!!') def right_join(self, target, on_left_field: list, on_right_field: list): """ 右关联功能 :param target: 关联实例 :return: 自身实例 """ if isinstance(target, JoinAble): target.left_join(self, on_left_field, on_right_field) return self else: raise Exception('关联对象不支持!!!') def union(self): pass def exe_join(self) -> str: self_table_alias = \ getattr(self, '__alias') if getattr(self, '__alias') != getattr(self, 'table_map_key') else '' result = [ getattr(self, 'table_map_key') + (' AS ' if getattr(self, '__alias') != getattr(self, 'table_map_key') else '') + self_table_alias ] for join_obj in getattr(self, '__join_list'): # 获取关联对象 obj = join_obj['target'] join_obj_table_alias = \ getattr(obj, '__alias') if getattr(obj, '__alias') != getattr(obj, 'table_map_key') else '' result.append(getattr(obj, 'table_map_key') + (' AS ' if getattr(obj, '__alias') != getattr(obj, 'table_map_key') else '') + join_obj_table_alias) # 获取关联键 left_field = join_obj['left_field'] right_field = join_obj['right_field'] # 以关联左键为准 on_fields_pair_sep = ' AND ' on_fields_pair = [] for index in range(len(left_field)): l_field = left_field[index] on_fields_pair.append( self_table_alias + '.' + l_field + ' = ' + join_obj_table_alias + '.' + right_field[index]) return ' LEFT JOIN '.join(result) + ' ON ' + on_fields_pair_sep.join(on_fields_pair) class Parseable(OrmObj): def read_from_dict(self, d: dict): """ 读取字典生成orm对象 :param d: :return: """ for key in d.keys(): if hasattr(self, key): setattr(self, key, d[key]) else: raise Exception('无法转换为' + str(getattr(self, 'table_map_key')) + '类型') return self def copy_to_obj(self, clz: OrmObj): res_obj = clz() for name in dir(self): if re.search('^_.*', name): continue if name not in dir(res_obj): raise Exception('无法复制的类型') setattr(res_obj, name, getattr(self, name)) return res_obj def copy_from_obj(self, obj: OrmObj): for name in dir(obj): if re.search('^_.*', name): continue if name not in dir(self): raise Exception('无法复制的对象') setattr(self, name, getattr(obj, name)) class Flushable(OrmObj): def flush(self): """ 清洗对象内部数据 :return: """ # 清洗参数列表缓存 self.f_c_l_flush() for name in dir(self): if name.startswith('__') and name.endswith('__'): continue if name.startswith('_') \ and not name.startswith('__') \ and not re.search('[A-Z]', name) \ and getattr(self, name): setattr(self, name, 'none') class Pageable(OrmObj): def limit(self, start: int, end: int): """ 写入上下标查询信息 :param start: 记录起始位置 :param end: 记录条数 :return: """ assert start > -1, '下标越界!' if end: assert end > -1, '上标越界!' setattr(self, '__limit_start', start) setattr(self, '__limit_end', end) setattr(self, '__execuable_limit_sql', f""" LIMIT {start}"""f""",{end}""" if end else '') return self def clear_limit_info(self): """ 清除上下标信息 :return: """ delattr(self, '__limit_start') delattr(self, '__limit_end') delattr(self, '__execuable_limit_sql') def page(self, page_num: int, page_size: int, pages_size: int = 10): """ 写入分页信息 :param page_num: :param page_size: :return: """ assert page_num > -1, '页码错误' assert page_size > 0, '页容错误' setattr(self, '__page_num', page_num) setattr(self, '__page_size', page_size) setattr(self, '__pages_size', pages_size) start = page_num * page_size end = page_size setattr(self, '__page_switch', True) return self.limit(start, end) def clear_page_info(self): """ 清除分页信息 :return: """ delattr(self, '__page_num') delattr(self, '__page_size') delattr(self, '__pages_size') setattr(self, '__page_switch', False) PK!X!dophon_db/orm/db_obj/type_dict.pyfrom datetime import datetime import sys import re import struct from dophon_logger import * logger = get_logger(DOPHON) logger.inject_logger(globals()) db_type_python_dict = { # 字符串类型 'char': { 'type': str, 'str_name': 'str', 'min_length': 0, 'max_length': 255 }, 'varchar': { 'type': str, 'str_name': 'str', 'min_length': 0, 'max_length': 65535 }, 'tinytext': { 'type': str, 'str_name': 'str', 'min_length': 0, 'max_length': 255 }, 'text': { 'type': str, 'str_name': 'str', 'min_length': 0, 'max_length': 65535 }, 'mediumtext': { 'type': str, 'str_name': 'str', 'min_length': 0, 'max_length': 16777215 }, 'longtext': { 'type': str, 'str_name': 'str', 'min_length': 0, 'max_length': 4294967295 }, 'enum': { 'type': tuple, 'str_name': 'tuple', 'min_length': 0, 'max_length': 65535 * 2 }, 'set': { 'type': set, 'str_name': 'set', 'min_length': 0, 'max_length': 64 * 8 }, # 日期类型 'date': { # 'type': datetime, 'type': str, 'str_name': 'datetime', 'min_length': 0, 'max_length': 65535 }, 'time': { 'type': datetime, 'str_name': 'datetime', 'min_length': 0, 'max_length': 65535 }, 'year': { 'type': datetime, 'str_name': 'datetime', 'min_length': 0, 'max_length': 65535 }, 'datetime': { 'type': datetime, 'str_name': 'datetime', 'min_length': 0, 'max_length': 65535 }, 'timestemp': { 'type': datetime, 'str_name': 'datetime', 'min_length': 0, 'max_length': 65535 }, # 浮点类型 'float': { 'type': float, 'str_name': 'float', 'min_length': 0, 'max_length': sys.maxsize }, 'double': { 'type': float, 'str_name': 'float', 'min_length': 0, 'max_length': sys.maxsize }, 'tinyint': { 'type': int, 'str_name': 'int', 'min_length': 0, 'max_length': 1 }, 'smallint': { 'type': int, 'str_name': 'int', 'min_length': 0, 'max_length': 2 }, 'mediumint': { 'type': int, 'str_name': 'int', 'min_length': 0, 'max_length': 3 }, # 整数类型 'int': { 'type': int, 'str_name': 'int', 'min_length': 0, 'max_length': 4 }, 'bigint': { 'type': int, 'str_name': 'int', 'min_length': 0, 'max_length': 8 } } # 计算数字字节数 def count_int_bytes(num: int): ''' 计算整形所占内存字节数 :param num: :return: ''' byte_num = int(len(re.sub('0b', '', bin(num))) / 8) + 1 return byte_num def count_str_bytes(string: str): ''' 计算字符串所占内存字节数 :param string: :return: ''' # byte_num = int(len(re.sub('0b', '', bin(int(string.encode().hex(), 16))))) + 1 byte_num = len(string) return byte_num def count_float_bytes(float_num: float): ''' 暂时弃用 :param float_num: :return: ''' byte_num = int(len(re.sub('0b', '', bin(int(hex(struct.pack(" 0: try: max_length = int(length_info) except Exception as e: logger.error(e) try: data_bytes = None if data_struct_info['type'] is int and isinstance(data, int): data_bytes = count_int_bytes(data) elif data_struct_info['type'] is str and isinstance(data, str): data_bytes = count_str_bytes(data) elif data_struct_info['type'] is datetime and isinstance(data, datetime): data_bytes = count_str_bytes(data.strftime('yyyy-MM-dd HH:mm:ss')) if data_bytes and data_bytes > min_length - 1 and data_bytes < max_length + 1: return True else: return False except: err_msg = '数据类型错误( data_type = ' + \ str(type(data)) + \ ' , db_type = ' + \ type_str + \ ' , required_type = ' + \ str(data_struct_info['type']) + ' ) ' logger.error(err_msg) raise Exception(err_msg) err_msg = '不支持的数据类型( data = ' + \ (type(data)) + \ ' ) ' logger.error(err_msg) raise Exception( err_msg ) def set_check(data_type): def fun(f): def arg(*args, **kwargs): value = args[1] if len(args) > 1 else kwargs['value'] if check_data(value, data_type): # 数据类型校验通过 pass else: try: # 尝试强制转换 db_type_python_dict[re.sub('([^a-zA-Z])','',data_type)]['type']( value ) except: err_msg = '数据类型校验不通过( data = ' + \ value + \ ' , data_type = ' + \ str(type(value)) + \ ' , db_type = ' + \ data_type + \ ' )' logger.error(err_msg) raise Exception( err_msg ) result = f(*args, **kwargs) return result return arg return fun PK!;} } dophon_db/orm/manager_init.pyfrom dophon_db import properties from dophon_db.orm import db_obj from dophon_db.utils import * from threading import Thread import time from hashlib import md5 import re from dophon_logger import * logger = get_logger(DOPHON) logger.inject_logger(globals()) manager_map = {} singleton_dbm = None class ManagerObject: pass def manager_map_to_object(): result = ManagerObject() for k, v in manager_map.items(): setattr(result, k, v) return result def init_orm_manager(table_list: list = [], conn_kwargs: dict = {}, orm_pre_load: bool = properties.orm_pre_load): global manager_map, singleton_dbm manager = db_obj.OrmManager() # print(conn_kwargs) db_obj.init_pool_in_manager(manager, conn_kwargs) if table_list or orm_pre_load: db_obj.init_tables_in_db(manager, table_list, conn_kwargs) else: pass # logger.info('对象映射改为运行时映射') if conn_kwargs: manager_label = conn_kwargs['alias'] manager_map[manager_label] = manager return manager_map_to_object() else: if singleton_dbm: return singleton_dbm singleton_dbm = manager # logger.info(manager_map) return manager def init_cluster_orm_manager(): global manager_map if manager_map: return manager_map db_cluster_info = get_db_cluster_info() for cluster_info in db_cluster_info: for cluster_name in cluster_info: logger.info('初始化分片%s数据库 : %s' % (cluster_name, '::'.join( [(str(re.sub('_', '', k)) + '<' + str(v) + '>') for k, v in cluster_info[cluster_name].items() if not re.match('.*(user|password).*', k)] ),)) # orm内部管理器初始化方式启动 # Thread(target=db_obj.init_tables_in_db, args=(manager,), # kwargs={'conn_kwargs': cluster_info[cluster_name]}).start() Thread(target=init_orm_manager, kwargs={ 'table_list': cluster_info[cluster_name]['table_list'], 'conn_kwargs': cluster_info[cluster_name] }).start() while len(manager_map) != len(db_cluster_info): # print(len(manager_map),'---',len(db_cluster_info)) time.sleep(1) return manager_map class ClusterManager: __map = {} def __init__(self): self.__map = init_cluster_orm_manager() def __getattr__(self, name): logger.info('获取相应对象管理') result = None print(self.__map) for k, v in self.__map.items(): # print(k,'---',v) if not result: if v.has_table(name): result = eval('v.' + name) else: try: result = eval('v.' + name) except KeyError as ke: # print(ke) pass if result: return result else: raise Exception('无法识别的表名') PK!i'i'dophon_db/orm/query_structor.pyimport math from dophon_db import utils from dophon_logger import * logger = get_logger(DOPHON) """ 查询语句结构映射 结构: select from where """ logger.inject_logger(globals()) def get_connection(target): """ 从连接池中获取连接 :return: """ return target.pool_getter().get_conn() class Selelct: """ 查询结构类 """ def before_select(self, fields_list: list, has_where: bool) -> str: result = f"""SELECT {(getattr(self, 'fields')(fields_list) if fields_list else ' * ')} FROM """f"""{( getattr(self, 'table_map_key') + ((' AS ' + getattr(self, '__alias')) if getattr(self, '__alias') != getattr(self, 'table_map_key') else '') if not hasattr(self, '__join_list') else (getattr(self, 'exe_join')()))}{( getattr(self, 'where')() if has_where else '')} {getattr(self, '__execuable_limit_sql') if hasattr(self, '__execuable_limit_sql') and hasattr(self, '__limit_start') else ''}""" return result.replace('\n', ' ') def select(self, fields: list = [], has_where: bool = True) -> list: """ 查询并获取该表结果集 :param fields: 列参 :return: 多条结果列表 """ sql = self.before_select(fields, has_where) logger.info('执行: %s', sql) result = [] connection = get_connection(self) cursor = connection.cursor() cursor.execute(sql) if not sql.startswith('select') and not sql.startswith('SELECT'): data = [[cursor.rowcount]] description = [['row_count']] else: data = cursor.fetchall() description = cursor.description connection.commit() result = utils.sort_result(data, description, result) # 清除当前分页信息 if hasattr(self, '__execuable_limit_sql') and hasattr(self, '__limit_start'): self.clear_limit_info() return self.page_filter(result) if getattr(self, '__page_switch', False) else result def select_one(self, fields: list = []) -> dict: """ 查询一条结果集 :param fields: 列参 :return: 单条结果集字典 """ if hasattr(self, '__field_callable_list') and len(getattr(self, '__field_callable_list')) > 0: # 存在可生成的查询条件 result = self.select(fields=fields) assert len(result) >= 0, '神奇的错误' assert len(result) <= 1, '过多结果集' return self.page_filter(result) if getattr(self, '__page_switch', False) else result else: logger.error('无法预料的唯一结果集,找不到查询过滤条件') raise Exception('无法预料的唯一结果集,找不到查询过滤条件') def select_all(self, fields: list = [], ignore_fields_warning: bool = True) -> list: """ 同select :param fields: :return: """ if not ignore_fields_warning and hasattr(self, '__field_callable_list') and len( getattr(self, '__field_callable_list')) > 0: logger.warning('警告:存在查询过滤条件 %s ', str(getattr(self, '__field_callable_list'))) result = self.select(fields=fields, has_where=False) return self.page_filter(result) if getattr(self, '__page_switch', False) else result def before_count(self, field_name: str, has_where: bool) -> str: count_str = getattr(self, 'fields')([field_name]) if field_name else ' * ' assert count_str, '无此列名' result = f"""SELECT COUNT({count_str}) AS count FROM """f"""{( getattr(self, 'table_map_key') + ((' AS ' + getattr(self, '__alias')) if getattr(self, '__alias') != getattr(self, 'table_map_key') else '') if not hasattr(self, '__join_list') else (getattr(self, 'exe_join')()))}{( getattr(self, 'where')() if has_where else '')} {getattr(self, '__execuable_limit_sql') if hasattr(self, '__execuable_limit_sql') and hasattr(self, '__limit_start') else ''}""" return result.replace('\n', ' ') def count(self, field: str = '', has_where: bool = True): """ 查询条数 :return: """ sql = self.before_count(field, has_where) logger.info('执行: %s', sql) result = [] connection = get_connection(self) cursor = connection.cursor() cursor.execute(sql) if not sql.startswith('select') and not sql.startswith('SELECT'): data = [[cursor.rowcount]] description = [['row_count']] else: data = cursor.fetchall() description = cursor.description connection.commit() result = utils.sort_result(data, description, result)[0] return result def page_filter(self, result: list) -> dict: """ 分页过滤器 注意: pages值由于python3问题为一个可迭代对象 :param result: :return: """ c_num = int(self.count()['count']) page_size = getattr(self, '__page_size', -1) pages_size = getattr(self, '__pages_size') page_num = getattr(self, '__page_num', 0) assert page_size > -1, '不存在分页信息' total_page = math.ceil((c_num / page_size)) # 清除分页信息 self.clear_page_info() return { 'list': result if isinstance(result, list) else [result], 'total': c_num, 'total_page': total_page, 'pages': list( range(page_num, (total_page if total_page <= (page_num + pages_size) else (page_num + pages_size))) ) } class Insert: """ 新增结构类 """ def before_insert(self): result = 'INSERT INTO ' + \ getattr(self, 'table_map_key') + ' ' + \ getattr(self, 'values')() return result def insert(self) -> int: """ 新增结果集 :return: 影响行数 [{'row_count': '0'}] """ sql = self.before_insert() logger.info('执行: %s', sql) result = [] connection = get_connection(self) cursor = connection.cursor() try: cursor.execute(sql) if not sql.startswith('select') and not sql.startswith('SELECT'): data = [[cursor.rowcount]] description = [['row_count']] else: data = cursor.fetchall() description = cursor.description connection.commit() result = utils.sort_result(data, description, result)[0]['row_count'] return int(result) except Exception as e: logger.error('%s', e) connection.rollback() return 0 class Update(): """ 更新结构类 """ def before_update(self, update: list, where: list): result = 'UPDATE ' + getattr(self, 'table_map_key') + \ ( (' AS ' + getattr(self, '__alias')) if getattr(self, '__alias') != getattr(self, 'table_map_key') else '' ) + \ getattr(self, 'set')(update) + \ getattr(self, 'where')(where) return result def update(self, update: list = [], where: list = []) -> int: """ 更新结果集 :param update: 更新列参 :param where: 条件列参 :return: 影响行数 """ sql = self.before_update(update, where) logger.info('执行: %s', sql) result = [] connection = get_connection(self) cursor = connection.cursor() try: cursor.execute(sql) if not sql.startswith('select') and not sql.startswith('SELECT'): data = [[cursor.rowcount]] description = [['row_count']] else: data = cursor.fetchall() description = cursor.description connection.commit() result = utils.sort_result(data, description, result)[0]['row_count'] return int(result) except Exception as e: logger.error('%s', e) connection.rollback() return 0 class Delete(): """ 删除结构类 """ def before_delete(self, where: list): result = 'DELETE FROM ' + getattr(self, 'table_map_key') + \ ' ' + getattr(self, 'where')(where, be_alias=False) return result def delete(self, where: list = []) -> int: """ 删除结果集 :param where: 条件列参 :return: 影响行数 """ sql = self.before_delete(where) logger.info('执行: %s', sql) result = [] connection = get_connection(self) cursor = connection.cursor() try: cursor.execute(sql) if not sql.startswith('select') and not sql.startswith('SELECT'): data = [[cursor.rowcount]] description = [['row_count']] else: data = cursor.fetchall() description = cursor.description connection.commit() result = utils.sort_result(data, description, result)[0]['row_count'] return int(result) except Exception as e: logger.error('%s', e) connection.rollback() return 0 class Struct(Selelct, Insert, Update, Delete): """ 查询语句结构类 """ PK!/  dophon_db/Pool.py# coding: utf-8 # 连接 from dophon_db import Connection import threading from dophon_db import properties import datetime from dophon_logger import * logger = get_logger(DOPHON) """ 连接池 author:CallMeE date:2018-06-01 """ logger.inject_logger(globals()) single_pool = None # 单例模式获取连接池 cluster_pool = None # 单例模式获取连接池 pool_action_log = {} lock = threading.Lock() # 全局线程锁 def record_support(f): def inner_method(*args, **kwargs): result = f(*args, **kwargs) obj_id = id(result) pool_action_log[obj_id] = datetime.datetime.now().timestamp() return result return inner_method def record_return(f): def inner_method(*args, **kwargs): obj_id = id(args[1]) if hasattr(properties, 'db_pool_exe_time') and getattr(properties, 'db_pool_exe_time'): logger.info( '用时:' + (datetime.datetime.now().timestamp() - pool_action_log[obj_id]) + '毫秒' ) f(*args, **kwargs) return inner_method def get_pool(conn_kwargs: dict = {}): return get_single_pool(conn_kwargs) def get_single_pool(conn_kwargs: dict = {}): global single_pool if single_pool: return single_pool pool = Pool() pool.init_pool(properties.pool_conn_num, Connection.Connection, conn_kwargs=conn_kwargs) single_pool = pool return single_pool class Pool(): _size = 0 cache_conn_kwargs = {} # 初始化连接池 def init_pool(self, num: int, Conn: Connection, conn_kwargs: dict = {}): _pool = [] self._Conn = Conn self.cache_conn_kwargs = conn_kwargs for item_c in range(num): # 遍历定义连接放入连接池 conn = Conn(**conn_kwargs) _pool.append(conn) self._pool = _pool self._size = num return self def __init__(self): logger.info(f'初始化连接池 => ({id(self)})') # 定义取出连接 @record_support def get_conn(self) -> Connection: __pool = self._pool if __pool: lock.acquire(blocking=True) currConn = __pool.pop(0) if currConn.test_conn(): # 连接有效 # 不作处理 pass else: logger.info('连接无效') currConn.re_conn() lock.release() return currConn else: # 连接数不足则新增连接 conn = Connection.Connection(**self.cache_conn_kwargs) self._pool.append(conn) return self.get_conn() # 定义归还连接 @record_return def close_conn(self, conn): self._pool.append(conn) # 定义查询连接池连接数 def size(self): return self._size # 定义释放所有连接 def free_pool(self): for conn in self._pool: conn.getConnect().close() PK!KYUU"U"dophon_db/reader/__init__.py# coding: utf-8 import xml.dom.minidom import re from copy import deepcopy import os """ xml读取工具 author:CallMeE date:2018-06-01 """ # 定义操作符 marks = { 'eq': '==', 'lt': '<', 'gt': '>', 'le': '<=', 'ge': '>=', 'not': '!=', 'has': 'HAS' } class Mapper: _data = {} def sort_tags(self, file: str, tree, name): tags = tree.getElementsByTagName(name) data = self._data file_list = file.split(os.sep) file_name = re.sub('(\\..*)|:', '', '_'.join(file_list)) if file_name not in data: data[file_name] = {} file_cache = data[file_name] for tag in tags: val = [] for node in tag.childNodes: if isinstance(node, xml.dom.minidom.Text): if re.sub('\s', '', node.data): val.append(re.sub('\n', '', node.data)) else: val.append(id(node)) attr = tag.getAttribute("id") id_bind_obj = {} id_bind_obj['main_sql'] = val # 获取条件标签 if_tags = self.sort_if_tags(tag) id_bind_obj['if'] = if_tags for_tags = self.sort_for_tag(tag) id_bind_obj['for'] = for_tags file_cache[attr] = id_bind_obj def open_dom(self, file): # 使用minidom解析器打开 XML 文档 DOMTree = xml.dom.minidom.parse(file) tags = DOMTree.documentElement self.alias_for = tags.getAttribute('for') if tags.hasAttribute('for') else None # 取出标签(增删查改) self.sort_tags(file, DOMTree, 'select') self.sort_tags(file, DOMTree, 'delete') self.sort_tags(file, DOMTree, 'insert') self.sort_tags(file, DOMTree, 'update') def get_tree(self): return self._data def sort_if_tags(self, tag): if_tags = tag.getElementsByTagName('if') tags_structer = {} for if_tag in if_tags: val = if_tag.childNodes[0].data attrs = if_tag.attributes attr_keys = attrs.keys() tag_struct = {} if attr_keys: if_key_index = 0 for k in attr_keys: if_key_struct = {} attr_value = str(if_tag.getAttribute(k)).split('|') attr_operator = marks.get(attr_value[0]) attr_operator_value = attr_value[1] if len(attr_value) > 1 else '' if attr_value: if_key_struct['_key'] = k if_key_struct['operator'] = attr_operator if_key_struct['_value'] = attr_operator_value if_key_struct['add_sql'] = val tag_struct[if_key_index] = if_key_struct if_key_index += 1 tags_structer[id(if_tag)] = tag_struct return tags_structer def sort_for_tag(self, tag): """ 处理循环标签(暂定使用文本元素替换标签元素) :param tag: :return: """ for_tags = tag.getElementsByTagName('for') tags_structer = {} if for_tags: for for_tag in for_tags: # 获取初始化相关参数 left_area_tag = for_tag.getAttribute('left') if for_tag.hasAttribute('left') else '(' right_area_tag = for_tag.getAttribute('right') if for_tag.hasAttribute('right') else ')' sep = for_tag.getAttribute('sep') if for_tag.hasAttribute('sep') else ',' tags_structer[id(for_tag)] = { 'sep': sep, 'left_area_tag': left_area_tag, 'right_area_tag': right_area_tag, 'arg_key': re.sub('\s*', '', re.sub('#{|}', '', for_tag.childNodes[0].data)) } # new = xml.dom.minidom.Text().replaceWholeText('111') # 创建文本节点 return tags_structer def get_executable_sql(self, sql_datas: dict, method_name: str, args=None) -> str: """ 获取实际执行语句 :param method_name: :param args: :return: """ result_sql = deepcopy(sql_datas)[method_name] # print(result_sql) if isinstance(result_sql, str): return result_sql if isinstance(result_sql, dict) and args and isinstance(args, dict): # 如果存在参数 # 处理条件标签 self.check_if(result_sql, args) # print(result_sql) self.check_for(result_sql, args) # print(result_sql) return ''.join(result_sql['main_sql']) # return ''.join([item if isinstance(item,str) else '' for item in result_sql['main_sql']]) return ''.join(result_sql['main_sql']) def check_for(self, main_struct: dict, args: dict): """ 检查循环结构 :param main_struct: :param args: :return: """ for_struct = main_struct['for'] if for_struct: # print(for_struct) main_struct_index = 0 for main_struct_item in main_struct['main_sql']: if not isinstance(main_struct_item, str) and main_struct_item in for_struct: # 遍历语句节点 if not isinstance(main_struct_item, str): for_v = for_struct[main_struct_item] arg_sequence = args.get(for_v['arg_key'], None) if arg_sequence: # 处理序列参数 index = 0 for i in arg_sequence: if not isinstance(i, str): arg_sequence[index] = str(i) index += 1 main_struct['main_sql'][main_struct_index] = for_v['left_area_tag'] + \ for_v['sep'].join(arg_sequence) + \ for_v['right_area_tag'] else: raise KeyError('找不到对应序列:' + for_v['arg_key']) main_struct_index += 1 def check_if(self, main_struct: dict, args: dict): """ 检查条件结构 :param if_struct: :param args: :return: """ if_struct = main_struct['if'] if if_struct: main_struct_index = 0 for main_struct_item in main_struct['main_sql']: if not isinstance(main_struct_item, str) and main_struct_item in if_struct: # 存在条件结构数据 compile_result = True for if_v in if_struct[main_struct_item].values(): # 遍历每个if标签 # 根据操作符进行判断 compile_key = if_v['_key'] if if_v['operator'] == marks['has']: if compile_key in args: # print(args.get(compile_key)) compile_result = compile_result and args.get(compile_key) else: compile_result = False else: # dict类型(标签属性结构) if compile_key in args: arg_value = args[compile_key] if isinstance(arg_value, str): compile_result = (if_v['_value'] == arg_value) and compile_result else: compile_result = eval( if_v['_value'] + if_v['operator'] + str(arg_value) ) and compile_result if compile_result: # 判断标签全属性条件是否通过 # 执行表达式结果 main_struct['main_sql'][main_struct_index] = if_v['add_sql'] else: main_struct['main_sql'][main_struct_index] = '' main_struct_index += 1 PK!qdophon_db/sqllite/__init__.pyfrom . import db_init class SqliteManager(): pass singleton = SqliteManager() def init(): db_init.init_config() for k, v in db_init.cache_config.items(): setattr(singleton, k.replace('sqlite_', ''), v) return singleton PK!Q9 dophon_db/sqllite/db_init.pyimport sqlite3 import types from dophon_db import properties from dophon_logger import * logger = get_logger(DOPHON) logger.inject_logger(globals()) config = properties.sqlite cache_config = {} class Table(): """ 数据库表类 """ def __init__(self, table_structure: set): for field in table_structure: setattr(self,f'_{field}',None) # 获取指定列名数据(property -> get) getter_module_code = compile( f'@property\ndef {field}(self):\n\treturn self._{field}', '', 'exec' ) gfunction_code = [c for c in getter_module_code.co_consts if isinstance(c, types.CodeType)][0] getter_method = types.FunctionType(gfunction_code, {}) setattr(self, f'{field}', getter_method) # 获取指定列名数据(property -> set) setter_module_code = compile( f'@{field}.setter\ndef set_{field}(self,val):\n\tprint("不可修改的参数")', '', 'exec' ) sfunction_code = [c for c in setter_module_code.co_consts if isinstance(c, types.CodeType)][0] setter_method = types.FunctionType(sfunction_code, {}) setattr(self, f'set_{field}', setter_method) setattr(self, field, property(getter_method, setter_method)) self.__fields = table_structure class DBObj(): """ 数据库对象 """ __database = ':memory:' def __init__(self, database: str = ''): self.__database = database if database else self.__database # 查询所有的表 result = self.execute('select name from sqlite_master where type=\'table\' order by name;') self.table_fields_struct = {} for item in result['data']: table_name = item[0] # 查询所有表结构 inner_result = self.execute(f'select * from {table_name};') table_fields = set({}) for table_column in inner_result['desc']: table_fields.add(table_column[0]) self.table_fields_struct[table_name] = table_fields setattr(self, f'{table_name}', Table(table_fields)) def execute(self, sql: str) -> dict: """ 执行语句,整理结果集 :param sql: sql语句 :return: """ init_conn = sqlite3.connect(self.__database) init_cursor = init_conn.cursor() init_cursor.execute(sql) result = { 'desc': init_cursor.description, 'data': init_cursor.fetchall() } init_conn.commit() init_conn.close() return result def sort_result(self, result: dict): """ 整理结果集 :param result: :return: """ dict_result = [] for result_item in result['data']: cache = {result['desc'][index][0]: result_item[index] for index in range(len(result_item))} dict_result.append(cache) return dict_result def init_config(): """ 初始化sqlite配置 :return: """ if cache_config: logger.warning('未生效任何配置') return for alias_name, alias_config in config.items(): if isinstance(alias_config, dict): cache_config['sqlite_' + alias_name] = DBObj(alias_config['database']) PK!  dophon_db/utils/__init__.pyDICT = dict OBJECT = object # 定义返回类型(默认字典类型) result_type = DICT def get_db_cluster_info(): from dophon_db import properties result = [] for cluster in properties.db_cluster: alias = cluster.get('alias', 'default_db') host = cluster.get('host', 'localhost') port = cluster.get('port', 3306) db = cluster.get('database', 'database') user = cluster.get('user', 'root') password = cluster.get('password', 'root') chartset = cluster.get('chartset', 'utf8') tables = cluster.get('tables', []) if isinstance(cluster.get('tables'), list) else \ [cluster.get('tables')] if 'tables' in cluster else [] result.append({ alias: { 'alias': alias, '__host': host, '__port': port, '__database': db, '__user': user, '__password': password, '__chartset': chartset, 'table_list': tables } }) return result def sort_result(data: list, description: tuple, result: list) -> list: """ # 整理结果集并返回 :param data: 数据集 :param description:数据描述 :param result: 结果列表(或许产生多个集合) :return: """ for index in range(len(data)): item = data[index] r_item = {} if result_type is DICT: for i in range(len(item)): colName = description[i][0] val = item[i] value = 'none' if type(val) is not type(None): value = str(val) # 组装data r_item[colName] = value # 组装结果集 result.append(r_item) elif result_type is OBJECT: class_obj = type(str(id(data)), (), {}) # 组装返回临时类 for i in range(len(item)): colName = description[i][0] val = item[i] value = None if type(val) is not type(None): value = str(val) class_obj = c_prop(class_obj, colName, value, use_setter=False) result.append(class_obj()) return result def show_banner(): print(f""" dP dP dP dP 88 88 88 88 .d888b88 .d8888b. 88d888b. 88d888b. .d8888b. 88d888b. .d888b88 88d888b. 88' `88 88' `88 88' `88 88' `88 88' `88 88' `88 88' `88 88' `88 88. .88 88. .88 88. .88 88 88 88. .88 88 88 88. .88 88. .88 `88888P8 `88888P' 88Y888P' dP dP `88888P' dP dP `88888P8 88Y8888' 88 dP depends on: pyMysql author:Callmee """)PK!:U&-&-'dophon_db-1.2.7.post2.dist-info/LICENSE Apache License Version 2.0, January 2004 http://www.apache.org/licenses/ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION 1. Definitions. "License" shall mean the terms and conditions for use, reproduction, and distribution as defined by Sections 1 through 9 of this document. "Licensor" shall mean the copyright owner or entity authorized by the copyright owner that is granting the License. "Legal Entity" shall mean the union of the acting entity and all other entities that control, are controlled by, or are under common control with that entity. For the purposes of this definition, "control" means (i) the power, direct or indirect, to cause the direction or management of such entity, whether by contract or otherwise, or (ii) ownership of fifty percent (50%) or more of the outstanding shares, or (iii) beneficial ownership of such entity. "You" (or "Your") shall mean an individual or Legal Entity exercising permissions granted by this License. "Source" form shall mean the preferred form for making modifications, including but not limited to software source code, documentation source, and configuration files. "Object" form shall mean any form resulting from mechanical transformation or translation of a Source form, including but not limited to compiled object code, generated documentation, and conversions to other media types. "Work" shall mean the work of authorship, whether in Source or Object form, made available under the License, as indicated by a copyright notice that is included in or attached to the work (an example is provided in the Appendix below). "Derivative Works" shall mean any work, whether in Source or Object form, that is based on (or derived from) the Work and for which the editorial revisions, annotations, elaborations, or other modifications represent, as a whole, an original work of authorship. For the purposes of this License, Derivative Works shall not include works that remain separable from, or merely link (or bind by name) to the interfaces of, the Work and Derivative Works thereof. "Contribution" shall mean any work of authorship, including the original version of the Work and any modifications or additions to that Work or Derivative Works thereof, that is intentionally submitted to Licensor for inclusion in the Work by the copyright owner or by an individual or Legal Entity authorized to submit on behalf of the copyright owner. For the purposes of this definition, "submitted" means any form of electronic, verbal, or written communication sent to the Licensor or its representatives, including but not limited to communication on electronic mailing lists, source code control systems, and issue tracking systems that are managed by, or on behalf of, the Licensor for the purpose of discussing and improving the Work, but excluding communication that is conspicuously marked or otherwise designated in writing by the copyright owner as "Not a Contribution." "Contributor" shall mean Licensor and any individual or Legal Entity on behalf of whom a Contribution has been received by Licensor and subsequently incorporated within the Work. 2. Grant of Copyright License. Subject to the terms and conditions of this License, each Contributor hereby grants to You a perpetual, worldwide, non-exclusive, no-charge, royalty-free, irrevocable copyright license to reproduce, prepare Derivative Works of, publicly display, publicly perform, sublicense, and distribute the Work and such Derivative Works in Source or Object form. 3. Grant of Patent License. Subject to the terms and conditions of this License, each Contributor hereby grants to You a perpetual, worldwide, non-exclusive, no-charge, royalty-free, irrevocable (except as stated in this section) patent license to make, have made, use, offer to sell, sell, import, and otherwise transfer the Work, where such license applies only to those patent claims licensable by such Contributor that are necessarily infringed by their Contribution(s) alone or by combination of their Contribution(s) with the Work to which such Contribution(s) was submitted. If You institute patent litigation against any entity (including a cross-claim or counterclaim in a lawsuit) alleging that the Work or a Contribution incorporated within the Work constitutes direct or contributory patent infringement, then any patent licenses granted to You under this License for that Work shall terminate as of the date such litigation is filed. 4. Redistribution. You may reproduce and distribute copies of the Work or Derivative Works thereof in any medium, with or without modifications, and in Source or Object form, provided that You meet the following conditions: (a) You must give any other recipients of the Work or Derivative Works a copy of this License; and (b) You must cause any modified files to carry prominent notices stating that You changed the files; and (c) You must retain, in the Source form of any Derivative Works that You distribute, all copyright, patent, trademark, and attribution notices from the Source form of the Work, excluding those notices that do not pertain to any part of the Derivative Works; and (d) If the Work includes a "NOTICE" text file as part of its distribution, then any Derivative Works that You distribute must include a readable copy of the attribution notices contained within such NOTICE file, excluding those notices that do not pertain to any part of the Derivative Works, in at least one of the following places: within a NOTICE text file distributed as part of the Derivative Works; within the Source form or documentation, if provided along with the Derivative Works; or, within a display generated by the Derivative Works, if and wherever such third-party notices normally appear. The contents of the NOTICE file are for informational purposes only and do not modify the License. You may add Your own attribution notices within Derivative Works that You distribute, alongside or as an addendum to the NOTICE text from the Work, provided that such additional attribution notices cannot be construed as modifying the License. You may add Your own copyright statement to Your modifications and may provide additional or different license terms and conditions for use, reproduction, or distribution of Your modifications, or for any such Derivative Works as a whole, provided Your use, reproduction, and distribution of the Work otherwise complies with the conditions stated in this License. 5. Submission of Contributions. Unless You explicitly state otherwise, any Contribution intentionally submitted for inclusion in the Work by You to the Licensor shall be under the terms and conditions of this License, without any additional terms or conditions. Notwithstanding the above, nothing herein shall supersede or modify the terms of any separate license agreement you may have executed with Licensor regarding such Contributions. 6. Trademarks. This License does not grant permission to use the trade names, trademarks, service marks, or product names of the Licensor, except as required for reasonable and customary use in describing the origin of the Work and reproducing the content of the NOTICE file. 7. Disclaimer of Warranty. Unless required by applicable law or agreed to in writing, Licensor provides the Work (and each Contributor provides its Contributions) on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied, including, without limitation, any warranties or conditions of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A PARTICULAR PURPOSE. You are solely responsible for determining the appropriateness of using or redistributing the Work and assume any risks associated with Your exercise of permissions under this License. 8. Limitation of Liability. In no event and under no legal theory, whether in tort (including negligence), contract, or otherwise, unless required by applicable law (such as deliberate and grossly negligent acts) or agreed to in writing, shall any Contributor be liable to You for damages, including any direct, indirect, special, incidental, or consequential damages of any character arising as a result of this License or out of the use or inability to use the Work (including but not limited to damages for loss of goodwill, work stoppage, computer failure or malfunction, or any and all other commercial damages or losses), even if such Contributor has been advised of the possibility of such damages. 9. Accepting Warranty or Additional Liability. While redistributing the Work or Derivative Works thereof, You may choose to offer, and charge a fee for, acceptance of support, warranty, indemnity, or other liability obligations and/or rights consistent with this License. However, in accepting such obligations, You may act only on Your own behalf and on Your sole responsibility, not on behalf of any other Contributor, and only if You agree to indemnify, defend, and hold each Contributor harmless for any liability incurred by, or claims asserted against, such Contributor by reason of your accepting any such warranty or additional liability. END OF TERMS AND CONDITIONS APPENDIX: How to apply the Apache License to your work. To apply the Apache License to your work, attach the following boilerplate notice, with the fields enclosed by brackets "[]" replaced with your own identifying information. (Don't include the brackets!) The text should be enclosed in the appropriate comment syntax for the file format. We also recommend that a file or class name and description of purpose be included on the same "printed page" as the copyright notice for easier identification within third-party archives. Copyright [yyyy] [name of copyright owner] Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. PK!HڽTU%dophon_db-1.2.7.post2.dist-info/WHEEL A н#Z;/"d&F[xzw@Zpy3Fv]\fi4WZ^EgM_-]#0(q7PK!Hw(dophon_db-1.2.7.post2.dist-info/METADATAJ@)bI!X j47 t.nKŋ|pC7NPL%hiH S1`o7jn #LFqͺyjṼɖ|Zp4t4.-rђa ʢKSt(7(.%[hpN}*˖c y 1/#ߏ\(ΞWzZTc^5Iq4 ݱ=păZmrPK!Hs| &dophon_db-1.2.7.post2.dist-info/RECORDDzH}? 7Y'p 6<3T]Ut/~yҜiǟaj0W@C~G ?uSχ8TtJ,7Qy^֡phhF۽J k:F;Pc5Q 4/k{ZC켨SD#7aWϨ;>/iP}$pBkލyI} 9'7lߙ'|5CjcQhbW&⼘p\h|I-/  c7>ѬFS!ʦ xզ,W=0|J\0c# foТ0G1^[䅫E6rZJh5c,֬~dwqaˌP쨜1_IYݬplg+,f7pptqB N6}xѢtsk3vQrc1xK5MW,)fOrȥuBVj&lξEA:(Qhqtg >1ԧVRu1qjTcH~6. ^撲ܩo,9x:6偶L.`2K/+\X`h{Pgdk#ӿ[r')bYķ31'f_IS>mNT圚GeEm|f#8@ۈ'QE]e`gEU}+q ax~rSF@I $4yʫ6[iרө{̆QyO atX=OSW =aYZE4yqi0ѸUg`-g9a$+PԃSih b_^oS>x钾`+E;LAY{&ϯ2(qX@Pݔ_#}f߽1ͫ-(2&5iSqJ6iMfv4V<;Nlu’ {ㅙXģW~D)~9=N[[P= &] u%qmfa OJXa_E&U}K:p呤yt{l{Ӎ1BbCT:l]jt;_L!qT N'a'8Fw*o{xeۑ2/jaDb0P͟ %q'7OA ˳wM}7q$9`&V F^Hu%_U \¼;]@V㏷rCT?r$ ):BnS_)pCPK!zkattdophon_db/__init__.pyPK! dophon_db/Connection.pyPK!?? dophon_db/const/__init__.pyPK!m{? dophon_db/const/regix_str.pyPK!a/? dophon_db/mysql/__init__.pyPK!"kdophon_db/mysql/binlog/__init__.pyPK!զm dophon_db/mysql/binlog/Schued.pyPK!A0#dophon_db/mysql/binlog/ZipBinLog.pyPK!ha H!dophon_db/mysql/cluster.pyPK!*C*]*]e+dophon_db/mysql/core.pyPK!oh Ĉdophon_db/mysql/PageHelper.pyPK!"ɒdophon_db/mysql/remote/__init__.pyPK!H9dophon_db/mysql/remote/Cell.pyPK!Epdophon_db/mysql/single.pyPK!'۰ $dophon_db/mysql/sql_util/__init__.pyPK!VTdophon_db/orm/__init__.pyPK!%$.$. bdophon_db/orm/db_obj/__init__.pyPK!3G 5/5//dophon_db/orm/db_obj/function_class/__init__.pyPK!X!F!dophon_db/orm/db_obj/type_dict.pyPK!;} } <dophon_db/orm/manager_init.pyPK!i'i'Hdophon_db/orm/query_structor.pyPK!/  opdophon_db/Pool.pyPK!KYUU"U"|dophon_db/reader/__init__.pyPK!q;dophon_db/sqllite/__init__.pyPK!Q9 ~dophon_db/sqllite/db_init.pyPK!  udophon_db/utils/__init__.pyPK!:U&-&-'ɺdophon_db-1.2.7.post2.dist-info/LICENSEPK!HڽTU%4dophon_db-1.2.7.post2.dist-info/WHEELPK!Hw(dophon_db-1.2.7.post2.dist-info/METADATAPK!Hs| & dophon_db-1.2.7.post2.dist-info/RECORDPK