PKtM1Oidataapi_transform/__init__.py"""convert data from a format to another format, read or write from file or database, suitable for iDataAPI""" from .cli import main from .DataProcess.Config.ConfigUtil import WriterConfig from .DataProcess.Config.ConfigUtil import GetterConfig from .DataProcess.ProcessFactory import ProcessFactory class ManualConfig(object): @staticmethod def set_config(ini_path): from .DataProcess.Config.MainConfig import main_config_box from .DataProcess.Config.DefaultValue import DefaultVal main_config_box.read_config(ini_path) DefaultVal.refresh() @staticmethod def disable_log(): from .DataProcess.Config.LogConfig import remove_log remove_log() @staticmethod def set_log_path(log_path, max_log_file_bytes): """ :param log_path: directory where log stores, i.e ===> /Desktop/logs/ :param max_log_file_bytes: max log file size, in bytes, i.e: 5242880(5MB) :return: """ from .DataProcess.Config.MainConfig import main_config_box from .DataProcess.Config.DefaultValue import DefaultVal main_config_box.config_log(log_path, max_log_file_bytes) __version__ = '1.6.3' PK}Lfe""idataapi_transform/cli.pyimport json import asyncio import argparse from .DataProcess.Config.DefaultValue import DefaultVal from .DataProcess.Config.ConfigUtil import GetterConfig from .DataProcess.Config.ConfigUtil import WriterConfig from .DataProcess.ProcessFactory import ProcessFactory class Args(object): from_choices = ["API", "ES", "CSV", "XLSX", "JSON", "REDIS", "MYSQL", "MONGO"] from_desc = "argument 'from' can only set to one of 'API', 'ES', 'CSV', 'XLSX', " \ "'JSON'(means json line by line file), 'REDIS', 'MYSQL' or 'MONGO'" to_choices = ["csv", "xlsx", "json", "txt", "es", "redis", 'mysql', 'mongo'] to_desc = "argument 'to' can only set to one of \"csv\", \"xlsx\", \"json\", \"txt\" \"es\", \"json\", \"redis\", " \ "\"mysql\", \"mongo\", \"json\" will write 'json.dumps(item)' line by line. " \ "\"txt\" will write each item line by line, each element in each line is separated by 'space' bu default" source_desc = """ argument 'source', When argument '-from' set to 'ES', source should be 'index:doc_type' When argument 'from' set tp 'API', source should be 'http://... argument 'from' set tp 'REDIS', source should be key name argument 'from' set tp 'MYSQL', source should be table name argument 'from' set to others, source should be file path """ dest_desc = "argument 'dest', filename to save result, no need for suffix, " \ "ie '/Desktop/result', default: './result'\n" \ "When argument '-to' set to 'ES', dest should be 'index:doc_type'" per_limit_desc = "amount of data buffered, when buffer filled, Program will write buffered data to 'dest', default 100" max_limit_desc = "write at most 'max_limit' data to 'dest', if 'max_limit' set to 0, means no limit, default to None" retry_desc = "when fetch data failed, retry at most 'retry' time, default 3" r_encoding_desc = "encoding of input file, ignore for xlsx format, default 'utf8'" w_encoding_desc = "encoding of output file, ignore for xlsx format, default 'utf8'" filter_desc = "file contains a 'my_filter(item)' function for filter" param_file_desc = """When you have many item save in id.json, --param_file './id.json::id::pid' means open './id.json ', read each json object line by line, use each_json['id'] as the parameter 'pid' and add it to the tail part of 'source'. --param_file can be either "filename.json::json_param::request_param" or "filename.txt::request_param" """ expand_desc = """If your item is {"a": {"b": "c"}, "b": "d"}, --expand 1 will make your item become {"a_b": "c", "b": "d"}, --expand N means expand at most N level deep of your object, --expand -1 means expand all level -- expand 0 means no expand of your item. Default 0. """ qsn_desc = """quote scientific notation, ie: 4324234234234234123123 will become 4.32423423423423E+021 in normal csv, If quote like '4324234234234234123123', it won't become scientific notation, Only work for output format 'csv' --qsn True means quote scientific notation, --qsn False means not quote scientific notation""" query_body_desc = """ElasticSearch query body, size has same function as "--limit", i.e: body = { "size": 100, "_source": { "includes": ["location", "title", "city", "id"] }, "query": { "bool": { "must": [ { "term": {"appCode": {"value": "ctrip"}} } ] } } } """ write_mode_desc = """'w' or 'a+'""" key_type_desc = """redis data type to operate, options: [LIST] or [HASH], default: [LIST]""" getter_config_map = { Args.from_choices[0]: GetterConfig.RAPIConfig, Args.from_choices[1]: GetterConfig.RESConfig, Args.from_choices[2]: GetterConfig.RCSVConfig, Args.from_choices[3]: GetterConfig.RXLSXConfig, Args.from_choices[4]: GetterConfig.RJsonConfig, Args.from_choices[5]: GetterConfig.RRedisConfig, Args.from_choices[6]: GetterConfig.RMySQLConfig, Args.from_choices[7]: GetterConfig.RMongoConfig } writer_config_map = { Args.to_choices[0]: WriterConfig.WCSVConfig, Args.to_choices[1]: WriterConfig.WXLSXConfig, Args.to_choices[2]: WriterConfig.WJsonConfig, Args.to_choices[3]: WriterConfig.WJsonConfig, Args.to_choices[4]: WriterConfig.WESConfig, Args.to_choices[5]: WriterConfig.WRedisConfig, Args.to_choices[6]: WriterConfig.WMySQLConfig, Args.to_choices[7]: WriterConfig.WMongoConfig } def get_arg(): parser = argparse.ArgumentParser(prog="idataapi_transform", description='convert data from a format to another format, ' 'read/write from file or database, suitable for iDataAPI') parser.add_argument("from", choices=Args.from_choices, help=Args.from_desc, type=str.upper) parser.add_argument("to", choices=Args.to_choices, help=Args.to_desc, type=str.lower) parser.add_argument("source", help=Args.source_desc) parser.add_argument("dest", help=Args.dest_desc, default=DefaultVal.dest, nargs="?") parser.add_argument("--per_limit", default=DefaultVal.per_limit, type=int, help=Args.per_limit_desc) parser.add_argument("--max_limit", default=DefaultVal.max_limit, type=int, help=Args.max_limit_desc) parser.add_argument("--max_retry", default=DefaultVal.max_retry, type=int, help=Args.retry_desc) parser.add_argument("--r_encoding", default=DefaultVal.default_encoding, help=Args.r_encoding_desc) parser.add_argument("--w_encoding", default=DefaultVal.default_encoding, help=Args.w_encoding_desc) parser.add_argument("--filter", default=None, help=Args.filter_desc) parser.add_argument("--expand", default=None, type=int, help=Args.expand_desc) parser.add_argument("--qsn", default=None, type=bool, help=Args.qsn_desc) parser.add_argument("--query_body", default=DefaultVal.query_body, type=str, help=Args.query_body_desc) parser.add_argument("--write_mode", default=DefaultVal.default_file_mode_w, type=str, help=Args.write_mode_desc) parser.add_argument("--key_type", default=DefaultVal.default_key_type, type=str.upper, help=Args.key_type_desc) return parser.parse_args() def get_filter(filter_file): if not filter_file: return None with open(filter_file, "r") as f: exec(f.read()) func = locals()["my_filter"] return func async def getter_to_writer(getter, writer): with writer as safe_writer: async for items in getter: if asyncio.iscoroutinefunction(safe_writer.write): await safe_writer.write(items) else: safe_writer.write(items) def main(): args = get_arg() from_ = getattr(args, "from") from_args = list() from_kwargs = dict() to_args = list() to_kwargs = dict() if from_ != Args.from_choices[0]: # not api from_args.extend(args.source.split(":")) else: from_args.extend([args.source]) from_kwargs["encoding"] = args.r_encoding from_kwargs["key_type"] = args.key_type if args.query_body: try: from_kwargs["query_body"] = json.loads(args.query_body) except Exception as e: raise SyntaxError("--query_body must be json serialized") for key in ("per_limit", "max_limit", "max_retry"): from_kwargs[key] = getattr(args, key) to_kwargs["filter_"] = get_filter(args.filter) to_kwargs["encoding"] = args.w_encoding to_kwargs["mode"] = args.write_mode to_kwargs["key_type"] = args.key_type for key in ("max_retry", "expand", "qsn"): to_kwargs[key] = getattr(args, key) if from_ not in getter_config_map: raise ValueError("argument from must be in %s" % (str(Args.from_choices), )) getter_config = getter_config_map[from_](*from_args, **from_kwargs) getter = ProcessFactory.create_getter(getter_config) if args.to == Args.to_choices[4]: # es indices, doc_type = args.dest.split(":") to_args.append(indices) to_args.append(doc_type) elif args.to in Args.to_choices[5:]: # redis, mysql, mongo if args.dest == DefaultVal.dest: to_args.append(DefaultVal.dest_without_path) else: to_args.append(args.dest) else: dest = args.dest + "." + args.to to_args.append(dest) writer_config = writer_config_map[args.to](*to_args, **to_kwargs) writer = ProcessFactory.create_writer(writer_config) loop = asyncio.get_event_loop() loop.run_until_complete(getter_to_writer(getter, writer)) if __name__ == "__main__": main() PK~Lɭ01 1 0idataapi_transform/DataProcess/ProcessFactory.py# config from .Config.MainConfig import main_config from .Config.ConfigUtil import GetterConfig from .Config.ConfigUtil import WriterConfig from .DataGetter.ESGetter import ESScrollGetter from .DataGetter.CSVGetter import CSVGetter from .DataGetter.APIGetter import APIGetter, APIBulkGetter from .DataGetter.JsonGetter import JsonGetter from .DataGetter.XLSXGetter import XLSXGetter from .DataGetter.RedisGetter import RedisGetter from .DataGetter.MySQLGetter import MySQLGetter from .DataGetter.MongoGetter import MongoGetter from .DataWriter.CSVWriter import CSVWriter from .DataWriter.ESWriter import ESWriter from .DataWriter.JsonWriter import JsonWriter from .DataWriter.TXTWriter import TXTWriter from .DataWriter.XLSXWriter import XLSXWriter from .DataWriter.RedisWriter import RedisWriter from .DataWriter.MySQLWriter import MySQLWriter from .DataWriter.MongoWriter import MongoWriter class ProcessFactory(object): config_getter_map = { GetterConfig.RAPIConfig: APIGetter, GetterConfig.RCSVConfig: CSVGetter, GetterConfig.RESConfig: ESScrollGetter, GetterConfig.RJsonConfig: JsonGetter, GetterConfig.RXLSXConfig: XLSXGetter, GetterConfig.RAPIBulkConfig: APIBulkGetter, GetterConfig.RRedisConfig: RedisGetter, GetterConfig.RMySQLConfig: MySQLGetter, GetterConfig.RMongoConfig: MongoGetter } config_writer_map = { WriterConfig.WCSVConfig: CSVWriter, WriterConfig.WESConfig: ESWriter, WriterConfig.WJsonConfig: JsonWriter, WriterConfig.WTXTConfig: TXTWriter, WriterConfig.WXLSXConfig: XLSXWriter, WriterConfig.WRedisConfig: RedisWriter, WriterConfig.WMySQLConfig: MySQLWriter, WriterConfig.WMongoConfig: MongoWriter } @staticmethod def create_getter(config): """ create a getter based on config :return: getter """ for config_class, getter_class in ProcessFactory.config_getter_map.items(): if isinstance(config, config_class): return getter_class(config) raise ValueError("create_getter must pass one of the instance of [RAPIConfig, RCSVConfig, RESConfig, " "RJsonConfig, RXLSXConfig, RAPIBulkConfig, RRedisConfig, RMySQLConfig, RMongoConfig]") @staticmethod def create_writer(config): """ create a writer based on config :return: a writer """ for config_class, writer_class in ProcessFactory.config_writer_map.items(): if isinstance(config, config_class): return writer_class(config) else: raise ValueError("create_writer must pass one of the instance of [WCSVConfig, WESConfig, WJsonConfig, " "WTXTConfig, WXLSXConfig, WRedisConfig, WMySQLConfig, WMongoConfig]") PK狗K*idataapi_transform/DataProcess/__init__.pyPK<9MCG8idataapi_transform/DataProcess/Config/ConnectorConfig.pyimport aiohttp import asyncio import inspect from .MainConfig import main_config class _SessionManger(object): def __init__(self, concurrency_limit=None, loop=None): concurrency_limit = main_config()["main"].getint("concurrency") if concurrency_limit is None else concurrency_limit self.session = self._generate_session(concurrency_limit=concurrency_limit, loop=loop) @staticmethod def _generate_connector(limit=None, loop=None): """ https://github.com/KeepSafe/aiohttp/issues/883 if connector is passed to session, it is not available anymore """ limit = main_config()["main"].getint("concurrency") if limit is None else limit if not loop: loop = asyncio.get_event_loop() return aiohttp.TCPConnector(limit=limit, loop=loop) @staticmethod def _generate_session(concurrency_limit=None, loop=None): if not loop: loop = asyncio.get_event_loop() concurrency_limit = main_config()["main"].getint("concurrency") if concurrency_limit is None else concurrency_limit return aiohttp.ClientSession(connector=_SessionManger._generate_connector(limit=concurrency_limit, loop=loop), loop=loop) def get_session(self): return self.session def __del__(self): try: if inspect.iscoroutinefunction(self.session.close): loop = asyncio.get_event_loop() loop.run_until_complete(self.session.close()) else: self.session.close() except Exception as e: pass session_manger = _SessionManger() PKQkM̷% % 5idataapi_transform/DataProcess/Config/DefaultValue.pyimport os import hashlib from .MainConfig import main_config class DefaultValObject(object): def __init__(self): self.refresh() def refresh(self): self.main_config = main_config() self.per_limit = self.main_config["main"].getint("per_limit") self.max_limit = self.main_config["main"].get("max_limit") if self.max_limit != "None": self.max_limit = int(self.max_limit) else: self.max_limit = None self.max_retry = self.main_config["main"].getint("max_retry") self.random_min_sleep = self.main_config["main"].getint("random_min_sleep") self.random_max_sleep = self.main_config["main"].getint("random_max_sleep") # redis self.redis_host = self.main_config["redis"].get("host") self.redis_port = self.main_config["redis"].getint("port") self.redis_db = self.main_config["redis"].get("db") self.redis_password = self.main_config["redis"].get("password") self.redis_timeout = self.main_config["redis"].getint("timeout") self.redis_encoding = self.main_config["redis"].get("encoding") self.redis_direction = self.main_config["redis"].get("direction") self.redis_compress = self.main_config["redis"].getboolean("compress") self.redis_need_del = self.main_config["redis"].getboolean("need_del") # mysql config self.mysql_host = self.main_config["mysql"].get("host") self.mysql_port = self.main_config["mysql"].getint("port") self.mysql_user = self.main_config["mysql"].get("user") self.mysql_password = self.main_config["mysql"].get("password") self.mysql_database = self.main_config["mysql"].get("database") self.mysql_encoding = self.main_config["mysql"].get("encoding") if not self.mysql_encoding: self.mysql_encoding = self.default_encoding # mongo config self.mongo_host = self.main_config["mongo"].get("host") self.mongo_port = self.main_config["mongo"].getint("port") self.mongo_username = self.main_config["mongo"].get("username") self.mongo_password = self.main_config["mongo"].get("password") self.mongo_database = self.main_config["mongo"].get("database") default_file_mode_r = "r" default_file_mode_w = "w" default_encoding = "utf8" new_line = "\n" join_val = " " title = "example" qsn = None query_body = None dest_without_path = "result" dest = os.getcwd() + "/" + dest_without_path interval = 5 concurrency = 50 default_key_type = "LIST" report_interval = 10 success_ret_code = ("100002", "100301", "100103") trim_to_max_limit = False exclude_filtered_to_max_limit = True @staticmethod def default_id_hash_func(item): if "appCode" in item and item["appCode"] and "id" in item and item["id"]: value = (item["appCode"] + "_" + item["id"]).encode("utf8") else: value = str(item).encode("utf8") return hashlib.md5(value).hexdigest() DefaultVal = DefaultValObject() PK[}Mi[,[,1idataapi_transform/DataProcess/Config/ESConfig.pyimport asyncio import aiohttp import json import time import logging import copy from elasticsearch_async.connection import AIOHttpConnection as OriginAIOHttpConnection from elasticsearch_async.transport import AsyncTransport as OriginAsyncTransport from elasticsearch_async.transport import ensure_future from elasticsearch import TransportError from aiohttp.client_exceptions import ServerFingerprintMismatch from elasticsearch.exceptions import ConnectionError, ConnectionTimeout, SSLError from elasticsearch.compat import urlencode from elasticsearch.connection_pool import ConnectionPool, DummyConnectionPool from elasticsearch_async import AsyncTransport from elasticsearch_async import AsyncElasticsearch from elasticsearch.client import _make_path, query_params es_hosts = None if hasattr(aiohttp, "Timeout"): async_timeout_func = aiohttp.Timeout else: import async_timeout async_timeout_func = async_timeout.timeout def init_es(hosts, es_headers, timeout_): global es_hosts, AsyncElasticsearch, AsyncTransport es_hosts = hosts if not es_hosts: return False class MyAsyncTransport(OriginAsyncTransport): """ Override default AsyncTransport to add timeout """ def perform_request(self, method, url, params=None, body=None, timeout=None, headers=None): if body is not None: body = self.serializer.dumps(body) # some clients or environments don't support sending GET with body if method in ('HEAD', 'GET') and self.send_get_body_as != 'GET': # send it as post instead if self.send_get_body_as == 'POST': method = 'POST' # or as source parameter elif self.send_get_body_as == 'source': if params is None: params = {} params['source'] = body body = None if body is not None: try: body = body.encode('utf-8') except (UnicodeDecodeError, AttributeError): # bytes/str - no need to re-encode pass ignore = () if params: ignore = params.pop('ignore', ()) if isinstance(ignore, int): ignore = (ignore,) return ensure_future(self.main_loop(method, url, params, body, ignore=ignore, timeout=timeout, headers=headers), loop=self.loop) @asyncio.coroutine def main_loop(self, method, url, params, body, ignore=(), timeout=None, headers=None): for attempt in range(self.max_retries + 1): connection = self.get_connection() try: status, headers, data = yield from connection.perform_request( method, url, params, body, ignore=ignore, timeout=timeout, headers=headers) except TransportError as e: if method == 'HEAD' and e.status_code == 404: return False retry = False if isinstance(e, ConnectionTimeout): retry = self.retry_on_timeout elif isinstance(e, ConnectionError): retry = True elif e.status_code in self.retry_on_status: retry = True if retry: # only mark as dead if we are retrying self.mark_dead(connection) # raise exception on last retry if attempt == self.max_retries: raise else: raise else: if method == 'HEAD': return 200 <= status < 300 # connection didn't fail, confirm it's live status self.connection_pool.mark_live(connection) if data: data = self.deserializer.loads(data, headers.get('content-type')) return data class AIOHttpConnection(OriginAIOHttpConnection): """ Override default AIOHttpConnection.perform_request to add headers """ @asyncio.coroutine def perform_request(self, method, url, params=None, body=None, timeout=None, ignore=(), headers=None): url_path = url if params: url_path = '%s?%s' % (url, urlencode(params or {})) url = self.base_url + url_path start = self.loop.time() response = None local_headers = headers if headers else es_headers try: with async_timeout_func(timeout or timeout_ or self.timeout): response = yield from self.session.request(method, url, data=body, headers=local_headers) raw_data = yield from response.text() duration = self.loop.time() - start except Exception as e: self.log_request_fail(method, url, url_path, body, self.loop.time() - start, exception=e) if isinstance(e, ServerFingerprintMismatch): raise SSLError('N/A', str(e), e) if isinstance(e, asyncio.TimeoutError): raise ConnectionTimeout('TIMEOUT', str(e), e) raise ConnectionError('N/A', str(e), e) finally: if response is not None: yield from response.release() # raise errors based on http status codes, let the client handle those if needed if not (200 <= response.status < 300) and response.status not in ignore: self.log_request_fail(method, url, url_path, body, duration, status_code=response.status, response=raw_data) self._raise_error(response.status, raw_data) self.log_request_success(method, url, url_path, body, response.status, raw_data, duration) return response.status, response.headers, raw_data class MyAsyncElasticsearch(AsyncElasticsearch): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) if "headers" in kwargs: self.headers = kwargs["headers"] else: self.headers = None async def add_dict_to_es(self, indices, doc_type, items, id_hash_func, app_code=None, actions=None, create_date=None, error_if_fail=True, timeout=None, auto_insert_createDate=True): if not actions: actions = "index" body = "" for item in items: if app_code: item["appCode"] = app_code if auto_insert_createDate and "createDate" not in item: if create_date: item["createDate"] = create_date else: item["createDate"] = int(time.time()) action = { actions: { "_index": indices, "_type": doc_type, "_id": id_hash_func(item) } } if actions == "update": item = {"doc": item} body += json.dumps(action) + "\n" + json.dumps(item) + "\n" try: success = fail = 0 r = await self.transport.perform_request("POST", "/_bulk?pretty", body=body, timeout=timeout, headers=self.headers) if r["errors"]: for item in r["items"]: for k, v in item.items(): if "error" in v: if error_if_fail: # log error logging.error(json.dumps(v["error"])) fail += 1 else: success += 1 else: success = len(r["items"]) return success, fail, r except Exception as e: import traceback logging.error(traceback.format_exc()) logging.error("elasticsearch Exception, give up: %s" % (str(e), )) return None, None, None @query_params('_source', '_source_exclude', '_source_include', 'allow_no_indices', 'allow_partial_search_results', 'analyze_wildcard', 'analyzer', 'batched_reduce_size', 'default_operator', 'df', 'docvalue_fields', 'expand_wildcards', 'explain', 'from_', 'ignore_unavailable', 'lenient', 'max_concurrent_shard_requests', 'pre_filter_shard_size', 'preference', 'q', 'request_cache', 'routing', 'scroll', 'search_type', 'size', 'sort', 'stats', 'stored_fields', 'suggest_field', 'suggest_mode', 'suggest_size', 'suggest_text', 'terminate_after', 'timeout', 'track_scores', 'track_total_hits', 'typed_keys', 'version') def search(self, index=None, doc_type=None, body=None, params=None): # from is a reserved word so it cannot be used, use from_ instead if 'from_' in params: params['from'] = params.pop('from_') if doc_type and not index: index = '_all' return self.transport.perform_request('GET', _make_path(index, doc_type, '_search'), params=params, body=body, headers=self.headers) def __del__(self): """ compatible with elasticsearch-async-6.1.0 """ try: loop = asyncio.get_event_loop() loop.run_until_complete(self.transport.close()) except Exception: pass OriginAIOHttpConnection.perform_request = AIOHttpConnection.perform_request OriginAsyncTransport.perform_request = MyAsyncTransport.perform_request OriginAsyncTransport.main_loop = MyAsyncTransport.main_loop AsyncElasticsearch = MyAsyncElasticsearch return True global_client = None def get_es_client(hosts=None, headers=None): global global_client if not hosts: if global_client is None: global_client = AsyncElasticsearch(hosts=es_hosts) return global_client else: return AsyncElasticsearch(hosts=hosts, headers=headers) """ below add in version 1.0.9, compatible for "close()" ===> future object """ def close(self): try: asyncio.ensure_future(self.connection.close()) except TypeError: pass def connection_pool_close(self): for conn in self.orig_connections: try: asyncio.ensure_future(conn.close()) except TypeError: pass ConnectionPool.close = connection_pool_close DummyConnectionPool.close = close PK̈}Mio2idataapi_transform/DataProcess/Config/LogConfig.pyimport os import logging from logging.handlers import RotatingFileHandler format_str = "%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s" date_formatter_str = '[%Y-%m-%d %H:%M:%S]' formatter = logging.Formatter(format_str, datefmt=date_formatter_str) class SingleLevelFilter(logging.Filter): def __init__(self, passlevel, reject): super(SingleLevelFilter, self).__init__() self.passlevel = passlevel self.reject = reject def filter(self, record): if self.reject: return record.levelno != self.passlevel else: return record.levelno == self.passlevel def init_log(log_dir, max_log_file_bytes, ini_path, manual=False): root_logger = logging.getLogger() root_logger.setLevel(logging.INFO) # console console = logging.StreamHandler() console.setFormatter(formatter) root_logger.addHandler(console) if log_dir: if not os.path.exists(log_dir): logging.error("log_dir(%s)%s not exists, I will not log to file" % (log_dir, "" if manual else " in configure file(%s)" % (ini_path, ))) return False if not max_log_file_bytes: logging.error("log_byte not set, please %s, or I will not log to file" % ("pass log_byte as parameters" if manual else "configure log_byte in configure file(%s)" % (ini_path, ))) return False # info h1 = RotatingFileHandler("%s/info.log" % (log_dir, ), mode="a", maxBytes=max_log_file_bytes, encoding="utf8", backupCount=1) h1.setFormatter(formatter) f1 = SingleLevelFilter(logging.INFO, False) h1.addFilter(f1) root_logger.addHandler(h1) # error h1 = RotatingFileHandler("%s/error.log" % (log_dir, ), mode="a", maxBytes=max_log_file_bytes, encoding="utf8", backupCount=1) h1.setFormatter(formatter) f1 = SingleLevelFilter(logging.ERROR, False) h1.addFilter(f1) root_logger.addHandler(h1) # logging.info("log dir set to: %s" % (log_dir, )) return True return False def remove_log(): root_logger = logging.getLogger() root_logger.handlers.clear() PKC}M}}3idataapi_transform/DataProcess/Config/MainConfig.pyimport os import json import configparser from os.path import expanduser from .LogConfig import init_log, remove_log from .ESConfig import init_es default_configure_content = """ [main] # default max concurrency value for APIGetter concurrency = 50 # buffer size per_limit = 100 # fetch at most max_limit items max_limit = None # max retry for getter before give up if fail to get data max_retry = 3 # sleep interval if fail random_min_sleep = 1 random_max_sleep = 3 [es] # elasticsearch host # hosts = ["localhost:9393"] # elasticsearch headers when perform http request # headers = {"Host": "localhost", "value": "value"} # request timeout, seconds # timeout = 10 [log] # a directory to save log file # path = /Users/zpoint/Desktop/idataapi-transform/logs/ # max byte per log file # log_byte = 5242880 """ redis_config_content = """ [redis] host = localhost port = 0 db = 0 password = timeout = 3 encoding = utf8 # whether need to del the key after get object from redis, 0 means false, 1 means true need_del = 0 # default direction when read/write , "L" means lpop/lpush, "R" means rpop/rpush direction = L """ mysql_config_content = """ [mysql] host = localhost port = 0 user = root password = database = # default charset encoding = utf8 """ mongo_config_content = """ [mongo] host = localhost port = 0 username = password = database = test_database """ main_config_box = None class MainConfig(object): def __init__(self, ini_path=None): global main_config_box main_config_box = self # singleton if not hasattr(self, "__instance"): if not ini_path: home = expanduser("~") ini_path = home + "/idataapi-transform.ini" if not os.path.exists(ini_path): with open(ini_path, "w") as f: f.write(default_configure_content + redis_config_content + mysql_config_content + mongo_config_content) if os.path.exists("./idataapi-transform.ini"): ini_path = "./idataapi-transform.ini" self.read_config(ini_path) def read_config(self, ini_path): self.ini_path = ini_path self.__instance = configparser.ConfigParser() self.__instance.read(ini_path) MainConfig.__instance = self.__instance self.has_log_file = self.__instance.has_log_file = self.config_log() self.has_es_configured = self.__instance.has_es_configured = self.config_es() self.has_redis_configured = self.__instance.has_redis_configured = self.config_redis() self.has_mysql_configured = self.__instance.has_mysql_configured = self.config_mysql() self.has_mongo_configured = self.__instance.has_mongo_configured = self.config_mongo() self.__instance.ini_path = self.ini_path def __call__(self): return self.__instance def config_log(self, log_path=None, max_log_file_bytes=None): remove_log() if log_path: manual = True else: max_log_file_bytes = self.__instance["log"].getint("log_byte") log_path = self.__instance["log"].get("path") manual = False return init_log(log_path, max_log_file_bytes, self.ini_path, manual=manual) def config_es(self): hosts = self.__instance["es"].get("hosts") timeout = self.__instance["es"].getint("timeout") if hosts: try: hosts = json.loads(hosts) except Exception as e: raise ValueError("es host must be json serialized") headers = self.__instance["es"].get("headers") if headers and headers != "None": try: headers = json.loads(headers) except Exception as e: raise ValueError("es headers must be json serialized") else: headers = None return init_es(hosts, headers, timeout) def config_redis(self): try: self.__instance["redis"].get("port") except KeyError as e: with open(self.ini_path, "a+") as f: f.write(redis_config_content) self.__instance.read(self.ini_path) port = self.__instance["redis"].getint("port") return port > 0 def config_mysql(self): try: self.__instance["mysql"].get("port") except KeyError as e: with open(self.ini_path, "a+") as f: f.write(mysql_config_content) self.__instance.read(self.ini_path) port = self.__instance["mysql"].getint("port") return port > 0 def config_mongo(self): try: self.__instance["mongo"].get("port") except KeyError as e: with open(self.ini_path, "a+") as f: f.write(mongo_config_content) self.__instance.read(self.ini_path) port = self.__instance["mongo"].getint("port") return port > 0 main_config = MainConfig() PK VK1idataapi_transform/DataProcess/Config/__init__.pyPK-Mkt?idataapi_transform/DataProcess/Config/ConfigUtil/AsyncHelper.pyclass AsyncGenerator(object): def __init__(self, items, process_func): self.items = items if hasattr(self.items, "__aiter__"): self.is_async = True else: self.is_async = False self.items = self.to_generator(items) self.process_func = process_func def __aiter__(self): return self async def __anext__(self): if self.is_async: r = await self.items.__anext__() return self.process_func(r) else: try: r = next(self.items) return self.process_func(r) except StopIteration: raise StopAsyncIteration @staticmethod def to_generator(items): for i in items: yield i PKZK@>idataapi_transform/DataProcess/Config/ConfigUtil/BaseConfig.pyimport abc class BaseGetterConfig(object, metaclass=abc.ABCMeta): @abc.abstractmethod def __init__(self, *args, **kwargs): pass class BaseWriterConfig(object, metaclass=abc.ABCMeta): @abc.abstractmethod def __init__(self, *args, **kwargs): pass PKtMNE[w[w@idataapi_transform/DataProcess/Config/ConfigUtil/GetterConfig.pyimport json import asyncio import inspect import aioredis try: import aiomysql except Exception as e: pass try: import motor.motor_asyncio except Exception as e: pass from .BaseConfig import BaseGetterConfig from ..ESConfig import get_es_client from ..DefaultValue import DefaultVal from ..ConnectorConfig import session_manger class RAPIConfig(BaseGetterConfig): def __init__(self, source, per_limit=DefaultVal.per_limit, max_limit=DefaultVal.max_limit, max_retry=DefaultVal.max_retry, random_min_sleep=None, random_max_sleep=None, session=None, filter_=None, return_fail=False, tag=None, call_back=None, report_interval=10, success_ret_code=None, done_if=None, trim_to_max_limit=DefaultVal.trim_to_max_limit, exclude_filtered_to_max_limit=DefaultVal.exclude_filtered_to_max_limit, post_body=None, persistent_writer=None, persistent_to_disk_if_give_up=True, **kwargs): """ will request until no more next_page to get, or get "max_limit" items :param source: API to get, i.e. "http://..." :param per_limit: how many items to get per time (counter will add each item after filter) :param max_limit: get at most max_limit items, if not set, get all (counter will add each item before filter) :param max_retry: if request fail, retry max_retry times :param random_min_sleep: if request fail, random sleep at least random_min_sleep seconds before request again :param random_max_sleep: if request fail, random sleep at most random_min_sleep seconds before request again :param session: aiohttp session to perform request :param filter_: run "transform --help" to see command line interface explanation for detail :param return_fail: if set to True, for each iteration, will return a tuple, api_getter = ProcessFactory.create_getter(RAPIConfig("http://...")) async for items, bad_objects in getter: A = bad_objects[0] A.response: -> json object: '{"appCode": "weixinpro", "dataType": "post", "message": "param error", "retcode": "100005"}', if fail in request, response will be None A.tag: -> tag you pass to RAPIConfig A.source: -> source you pass to RAPIConfig A.post_body: -> http post body :param call_back: a function(can be async function) to call on results before each "async for" return :param report_interval: an integer value, if set to 5, after 5 request times, current response counter still less than 'per_limit', the "async for' won't return to user, there's going to be an INFO log to tell user what happen :param success_ret_code: ret_code indicate success, default is ("100002", "100301", "100103") ===> ("search no result", "account not found", "account processing") :param done_if: the APIGetter will automatically fetch next page until max_limit or no more page, if you provide a function, APIGetter will terminate fetching next page when done_if(items) return True :param trim_to_max_limit: set max_limit to the precise value, default max_limit is rough value :param exclude_filtered_to_max_limit: max_limit including filtered object or excluding filtered object :param post_body: POST with post_body instead of get :param persistent_writer: corporate with RAPIBulkConfig :param persistent_to_disk_if_give_up: corporate with RAPIBulkConfig, when retry to max_retry times, still fail to get result, whether regard this job as success and persistent to disk or not :param args: :param kwargs: Example: api_config = RAPIConfig("http://...") api_getter = ProcessFactory.create_getter(api_config) async for items in api_getter: print(items) """ super().__init__() if not random_min_sleep: random_min_sleep = DefaultVal.random_min_sleep if not random_max_sleep: random_max_sleep = DefaultVal.random_max_sleep if not success_ret_code: success_ret_code = DefaultVal.success_ret_code self.source = source self.per_limit = per_limit self.max_limit = max_limit self.max_retry = max_retry self.random_min_sleep = random_min_sleep self.random_max_sleep = random_max_sleep self.session = session_manger.get_session() if not session else session self.filter = filter_ self.return_fail = return_fail self.tag = tag self.call_back = call_back self.report_interval = report_interval self.success_ret_code = success_ret_code self.done_if = done_if self.trim_to_max_limit = trim_to_max_limit self.exclude_filtered_to_max_limit = exclude_filtered_to_max_limit if post_body: if not isinstance(post_body, (bytes, str)): post_body = json.dumps(post_body).encode(DefaultVal.default_encoding) self.post_body = post_body self.persistent_writer = persistent_writer self.persistent_to_disk_if_give_up = persistent_to_disk_if_give_up class RCSVConfig(BaseGetterConfig): def __init__(self, filename, mode=DefaultVal.default_file_mode_r, encoding=DefaultVal.default_encoding, per_limit=None, max_limit=None, filter_=None, **kwargs): """ :param filename: filename to read :param mode: file open mode, i.e "r" :param encoding: file encoding i.e "utf8" :param per_limit: how many items to get per time :param max_limit: get at most max_limit items, if not set, get all :param filter_: run "transform --help" to see command line interface explanation for detail :param kwargs: Example: csv_config = RJsonConfig("./result.csv", encoding="gbk") csv_getter = ProcessFactory.create_getter(csv_config) async for items in csv_getter: print(items) # both async generator and generator implemented for items in csv_getter: print(items) """ super().__init__() if not per_limit: per_limit = DefaultVal.per_limit if not max_limit: max_limit = DefaultVal.max_limit self.filename = filename self.mode = mode self.encoding = encoding self.per_limit = per_limit self.max_limit = max_limit self.filter = filter_ class RESConfig(BaseGetterConfig): def __init__(self, indices, doc_type, per_limit=None, max_limit=None, scroll="1m", query_body=None, return_source=True, max_retry=None, random_min_sleep=None, random_max_sleep=None, filter_=None, hosts=None, headers=None, **kwargs): """ :param indices: elasticsearch indices :param doc_type: elasticsearch doc_type :param per_limit: how many items to get per request :param max_limit: get at most max_limit items, if not set, get all :param scroll: default is "1m" :param query_body: default is '{"size": "per_limit", "query": {"match_all": {}}}' :param return_source: if set to True, will return [item , ..., itemN], item is the "_source" object if set to False, will return whatever elasticsearch return, i.e {"hits": {"total": ...}} :param max_retry: if request fail, retry max_retry times :param random_min_sleep: if request fail, random sleep at least random_min_sleep seconds before request again :param random_max_sleep: if request fail, random sleep at most random_min_sleep seconds before request again :param filter_: run "transform --help" to see command line interface explanation for detail, only work if return_source is False :param hosts: elasticsearch hosts, list type, i.e: ["localhost:8888", "127.0.0.2:8889"] :param headers: headers when perform http requests to elasticsearch, dict type, i.e: {"Host": "aaa", "apikey": "bbb"} :param kwargs: Example: body = { "size": 100, "_source": { "includes": ["likeCount", "id", "title"] } } es_config = RESConfig("post20170630", "news", max_limit=1000, query_body=body) es_getter = ProcessFactory.create_getter(es_config) async for items in es_getter: print(item) """ super().__init__() if not random_min_sleep: random_min_sleep = DefaultVal.random_min_sleep if not random_max_sleep: random_max_sleep = DefaultVal.random_max_sleep if not per_limit: per_limit = DefaultVal.per_limit if not max_limit: max_limit = DefaultVal.max_limit if not max_retry: max_retry = DefaultVal.max_retry if not DefaultVal.main_config.has_es_configured: raise ValueError("You must config es_hosts before using Elasticsearch, Please edit configure file: %s" % (DefaultVal.main_config.ini_path, )) if not query_body: query_body = { "size": per_limit, "query": { "match_all": {} } } self.query_body = query_body self.indices = indices self.doc_type = doc_type self.per_limit = per_limit self.max_limit = max_limit self.scroll = scroll self.es_client = get_es_client(hosts=hosts, headers=headers) self.return_source = return_source self.max_retry = max_retry self.random_min_sleep = random_min_sleep self.random_max_sleep = random_max_sleep self.filter = filter_ class RJsonConfig(BaseGetterConfig): def __init__(self, filename, mode=DefaultVal.default_file_mode_r, encoding=DefaultVal.default_encoding, per_limit=None, max_limit=None, filter_=None, **kwargs): """ :param filename: line by line json file to read :param mode: file open mode, i.e "r" :param encoding: file encoding i.e "utf8" :param per_limit: how many items to get per time :param max_limit: get at most max_limit items, if not set, get all :param filter_: run "transform --help" to see command line interface explanation for detail :param kwargs: Example: json_config = RJsonConfig("./result.json") json_getter = ProcessFactory.create_getter(json_config) async for items in json_getter: print(items) # both async generator and generator implemented for items in json_getter: print(items) """ super().__init__() if not per_limit: per_limit = DefaultVal.per_limit if not max_limit: max_limit = DefaultVal.max_limit self.filename = filename self.mode = mode self.encoding = encoding self.per_limit = per_limit self.max_limit = max_limit self.filter = filter_ class RXLSXConfig(BaseGetterConfig): def __init__(self, filename, per_limit=None, max_limit=None, sheet_index=0, filter_=None, **kwargs): """ :param filename: filename to read :param per_limit: how many items to get per time :param max_limit: get at most max_limit items, if not set, get all :param sheet_index: which sheet to get, 0 means 0th sheet :param filter_: run "transform --help" to see command line interface explanation for detail :param kwargs: Example: xlsx_config = RXLSXConfig("./result.xlsx") xlsx_getter = ProcessFactory.create_getter(xlsx_config) async for items in xlsx_getter: print(items) # both async generator and generator implemented for items in xlsx_getter: print(items) """ super().__init__() if not per_limit: per_limit = DefaultVal.per_limit if not max_limit: max_limit = DefaultVal.max_limit self.filename = filename self.per_limit = per_limit self.max_limit = max_limit self.sheet_index = sheet_index self.filter = filter_ class RAPIBulkConfig(BaseGetterConfig): def __init__(self, sources, interval=DefaultVal.interval, concurrency=None, filter_=None, return_fail=False, done_if=None, trim_to_max_limit=DefaultVal.trim_to_max_limit, exclude_filtered_to_max_limit=DefaultVal.exclude_filtered_to_max_limit, persistent=False, persistent_key=None, persistent_start_fresh_if_done=True, persistent_to_disk_if_give_up=True, **kwargs): """ :param sources: an iterable object (can be async generator), each item must be "url" or instance of RAPIConfig :param interval: integer or float, each time you call async generator, you will wait for "interval" seconds and get all items fetch during this "interval", notice if sources is an "async generator", the "interval" seconds will exclude the time processing async fenerator :param concurrency: how many concurrency task run, default read from config file, if concurrency set, only string(url) in "sources" will work with this concurrency level, RAPIConfig instance won't :param filter_: run "transform --help" to see command line interface explanation for detail :param return_fail: if set to True, for each iteration, will return a tuple, api_getter = ProcessFactory.create_getter(RAPIBulkConfig([...])) async for items, bad_objects in getter: A = bad_objects[0] A.response: -> json object: '{"appCode": "weixinpro", "dataType": "post", "message": "param error", "retcode": "100005"}', if fail in request, response will be None A.tag: -> tag you pass to RAPIConfig A.source: -> source you pass to RAPIConfig :param done_if: if will only work if the source[n] is type string, if the source[n] is type RAPIConfig, it won't work, please refer to RAPIConfig for more detail :param trim_to_max_limit: set max_limit to the precise value, default max_limit is rough value :param exclude_filtered_to_max_limit: max_limit including filtered object or excluding filtered object :param persistent: whether save progress to disk, if set to true, the job progress will be persistent to disk every "interval" seconds :param persistent_key: the key to identify the task :param persistent_start_fresh_if_done: if all task done, whether remove the persistent record file, if the persistent file hasn't been removed and all of the jobs finished, next time you run the program, there will be no job to schedule :param persistent_to_disk_if_give_up: if there's a job fail after retry max_retry times, whether regard this job as success and persistent to disk or not :param kwargs: Example: sources = ["http://....", "http://....", "http://....", RAPIConfig("http://....")] bulk_config = RAPUBulkConfig(sources) bulk_getter = ProcessFactory.create_getter(bulk_config) async for items in bulk_getter: print(items) """ super().__init__() if not concurrency: concurrency = DefaultVal.main_config["main"].getint("concurrency") self.sources = sources self.interval = interval self.concurrency = concurrency self.session = session_manger._generate_session(concurrency_limit=concurrency) self.filter = filter_ self.return_fail = return_fail self.done_if = done_if self.trim_to_max_limit = trim_to_max_limit self.exclude_filtered_to_max_limit = exclude_filtered_to_max_limit self.persistent = persistent self.persistent_key = persistent_key self.persistent_start_fresh_if_done = persistent_start_fresh_if_done self.persistent_to_disk_if_give_up = persistent_to_disk_if_give_up def __del__(self): if inspect.iscoroutinefunction(self.session.close): if not self.session.closed: if self.session._connector is not None and self.session._connector_owner: self.session._connector.close() self._connector = None else: self.session.close() class RRedisConfig(BaseGetterConfig): def __init__(self, key, key_type="LIST", per_limit=None, max_limit=None, filter_=None, max_retry=None, random_min_sleep=None, random_max_sleep=None, host=None, port=None, db=None, password=None, timeout=None, encoding=None, need_del=None, direction=None, compress=None, **kwargs): """ :param key: redis key to get data :param key_type: redis data type to operate, current only support LIST, HASH :param per_limit: how many items to get per time :param max_limit: get at most max_limit items, if not set, get all :param max_retry: if request fail, retry max_retry times :param random_min_sleep: if request fail, random sleep at least random_min_sleep seconds before request again :param random_max_sleep: if request fail, random sleep at most random_min_sleep seconds before request again :param filter_: run "transform --help" to see command line interface explanation for detail :param host: redis host -> str :param port: redis port -> int :param db: redis database number -> int :param password: redis password -> int :param timeout: timeout per redis connection -> float :param encoding: redis object encoding -> str :param need_del: whether need to del the key after get object from redis -> boolean :param direction: "L" or "R", left to right or roght to left :param compress: whether compress data use zlib before write to redis -> boolean :param kwargs: Example: redis_config = RRedisConfig("my_key") redis_getter = ProcessFactory.create_getter(redis_config) async for items in redis_getter: print(items) """ super().__init__() # load default value if not random_min_sleep: random_min_sleep = DefaultVal.random_min_sleep if not random_max_sleep: random_max_sleep = DefaultVal.random_max_sleep if not per_limit: per_limit = DefaultVal.per_limit if not max_limit: max_limit = DefaultVal.max_limit if not max_retry: max_retry = DefaultVal.max_retry if host is None: host = DefaultVal.redis_host if port is None: port = DefaultVal.redis_port if db is None: db = DefaultVal.redis_db if password is None: password = DefaultVal.redis_password if timeout is None: timeout = DefaultVal.redis_timeout if encoding is None: encoding = DefaultVal.redis_encoding if direction is None: direction = DefaultVal.redis_direction if need_del is None: need_del = DefaultVal.redis_need_del if compress is None: compress = DefaultVal.redis_compress if not DefaultVal.main_config.has_redis_configured and port <= 0: raise ValueError("You must config redis before using Redis, Please edit configure file: %s" % (DefaultVal.main_config.ini_path, )) if key_type not in ("LIST", "HASH"): raise ValueError("key_type must be one of (%s)" % (str(("LIST", "HASH")), )) if not encoding: raise ValueError("You must specific encoding, since I am going to load each object in json format, " "and treat it as dictionary in python") if not password: password = None self.redis_pool_cli = None self.key = key self.host = host self.port = port self.db = db self.password = password self.encoding = encoding self.timeout = timeout self.key_type = key_type self.per_limit = per_limit self.max_limit = max_limit self.filter = filter_ self.max_retry = max_retry self.random_min_sleep = random_min_sleep self.random_max_sleep = random_max_sleep self.need_del = need_del self.name = "%s_%s->%s" % (str(host), str(port), str(key)) self.redis_read_method = self.redis_len_method = self.redis_del_method = None self.direction = direction self.compress = compress if key_type == "LIST": self.is_range = True else: self.is_range = False async def get_redis_pool_cli(self): """ :return: an async redis client """ if self.redis_pool_cli is None: kwargs = { "db": self.db, "password": self.password, "encoding": self.encoding, "timeout": self.timeout, "minsize": 1, "maxsize": 3 } if self.compress: del kwargs["encoding"] self.redis_pool_cli = await aioredis.create_redis_pool((self.host, self.port), **kwargs) if self.key_type == "LIST": self.redis_read_method = self.redis_pool_cli.lrange self.redis_len_method = self.redis_pool_cli.llen self.redis_del_method = self.redis_pool_cli.ltrim else: self.redis_read_method = self.redis_pool_cli.hgetall self.redis_len_method = self.redis_pool_cli.hlen self.redis_del_method = self.redis_pool_cli.delete return self.redis_pool_cli class RMySQLConfig(BaseGetterConfig): def __init__(self, table, per_limit=None, max_limit=None, filter_=None, max_retry=None, random_min_sleep=None, random_max_sleep=None, host=None, port=None, user=None, password=None, database=None, encoding=None, loop=None, **kwargs): """ :param table: mysql table :param per_limit: how many items to get per time :param max_limit: get at most max_limit items, if not set, get all :param filter_: run "transform --help" to see command line interface explanation for detail :param max_retry: if request fail, retry max_retry times :param random_min_sleep: if request fail, random sleep at least random_min_sleep seconds before request again :param random_max_sleep: if request fail, random sleep at most random_min_sleep seconds before request again :param host: mysql host -> str :param port: mysql port -> int :param user: mysql user -> str :param password: mysql password -> str :param database: mysql database -> str :param charset: default utf8 -> str :param loop: async loop instance :param kwargs: Example: mysql_config = RRedisConfig("my_table") redis_getter = ProcessFactory.create_getter(mysql_config) async for items in redis_getter: print(items) """ super().__init__() if not random_min_sleep: random_min_sleep = DefaultVal.random_min_sleep if not random_max_sleep: random_max_sleep = DefaultVal.random_max_sleep if not per_limit: per_limit = DefaultVal.per_limit if not max_limit: max_limit = DefaultVal.max_limit if not max_retry: max_retry = DefaultVal.max_retry if not host: host = DefaultVal.mysql_host if not port: port = DefaultVal.mysql_port if not user: user = DefaultVal.mysql_user if not password: password = DefaultVal.mysql_password if not database: database = DefaultVal.mysql_database if not encoding: encoding = DefaultVal.mysql_encoding if not DefaultVal.main_config.has_mysql_configured and port <= 0: raise ValueError("You must config mysql before using MySQL, Please edit configure file: %s" % (DefaultVal.main_config.ini_path, )) if "aiomysql" not in globals(): raise ValueError("module mysql disabled, please reinstall " "requirements with python version higher than 3.5.3 to enable it") self.table = table self.database = database self.max_limit = max_limit self.per_limit = per_limit self.max_retry = max_retry self.random_min_sleep = random_min_sleep self.random_max_sleep = random_max_sleep self.filter = filter_ self.name = "%s->%s" % (self.database, self.table) self.host = host self.port = port self.user = user if not password: password = '' self.password = password self.database = database self.encoding = encoding if not loop: loop = asyncio.get_event_loop() self.loop = loop self.mysql_pool_cli = self.connection = self.cursor = None async def get_mysql_pool_cli(self): """ :return: an async mysql client """ if self.mysql_pool_cli is None: self.mysql_pool_cli = await aiomysql.create_pool(host=self.host, port=self.port, user=self.user, password=self.password, db=self.database, loop=self.loop, minsize=1, maxsize=3, charset=self.encoding) self.connection = await self.mysql_pool_cli.acquire() self.cursor = await self.connection.cursor() return self.mysql_pool_cli def free_resource(self): if self.mysql_pool_cli is not None: self.mysql_pool_cli.release(self.connection) self.mysql_pool_cli.close() self.loop.create_task(self.mysql_pool_cli.wait_closed()) self.mysql_pool_cli = self.connection = self.cursor = None class RMongoConfig(BaseGetterConfig): def __init__(self, collection, per_limit=None, max_limit=None, query_body=None, max_retry=None, random_min_sleep=None, random_max_sleep=None, filter_=None, host=None, port=None, username=None, password=None, database=None, **kwargs): """ :param collection: collection name :param per_limit: how many items to get per request :param max_limit: get at most max_limit items, if not set, get all :param query_body: search query, default None, i.e: {'i': {'$lt': 5}} :param return_source: if set to True, will return [item , ..., itemN], item is the "_source" object if set to False, will return whatever elasticsearch return, i.e {"hits": {"total": ...}} :param max_retry: if request fail, retry max_retry times :param random_min_sleep: if request fail, random sleep at least random_min_sleep seconds before request again :param random_max_sleep: if request fail, random sleep at most random_min_sleep seconds before request again :param filter_: run "transform --help" to see command line interface explanation for detail :param kwargs: Example: mongo_config = RMongoConfig("my_coll") mongo_getter = ProcessFactory.create_getter(mongo_config) async for items in mongo_getter: print(item) """ super().__init__() if not random_min_sleep: random_min_sleep = DefaultVal.random_min_sleep if not random_max_sleep: random_max_sleep = DefaultVal.random_max_sleep if not per_limit: per_limit = DefaultVal.per_limit if not max_limit: max_limit = DefaultVal.max_limit if not max_retry: max_retry = DefaultVal.max_retry if not host: host = DefaultVal.mongo_host if not port: port = DefaultVal.mongo_port if not username: username = DefaultVal.mongo_username if not password: password = DefaultVal.mongo_password if not database: database = DefaultVal.mongo_database if not DefaultVal.main_config.has_mongo_configured: raise ValueError("You must config MongoDB before using MongoDB, Please edit configure file: %s" % (DefaultVal.main_config.ini_path, )) if "motor" not in globals(): raise ValueError("module motor disabled, please reinstall " "requirements in linux") self.collection = collection self.query_body = query_body self.per_limit = per_limit self.max_limit = max_limit self.max_retry = max_retry self.random_min_sleep = random_min_sleep self.random_max_sleep = random_max_sleep self.filter = filter_ self.host = host self.port = port self.username = username self.password = password self.database = database self.name = "%s->%s" % (self.database, self.collection) self.client = self.cursor = None def get_mongo_cli(self): if self.client is None: kwargs = { "host": self.host, "port": self.port } if self.username: address = "mongodb://%s:%s@%s:%s/%s" % (self.username, self.password, kwargs["host"], str(kwargs["port"]), self.database) self.client = motor.motor_asyncio.AsyncIOMotorClient(address) else: self.client = motor.motor_asyncio.AsyncIOMotorClient(**kwargs) if self.query_body: self.cursor = self.client[self.database][self.collection].find(self.query_body) else: self.cursor = self.client[self.database][self.collection].find() return self.client PKUMԯSS@idataapi_transform/DataProcess/Config/ConfigUtil/WriterConfig.pyimport asyncio import aioredis import inspect try: import aiomysql except Exception as e: pass try: import motor.motor_asyncio except Exception as e: pass from .BaseConfig import BaseWriterConfig from ..ESConfig import get_es_client from ..DefaultValue import DefaultVal class WCSVConfig(BaseWriterConfig): def __init__(self, filename, mode=DefaultVal.default_file_mode_w, encoding=DefaultVal.default_encoding, headers=None, filter_=None, expand=None, qsn=DefaultVal.qsn, **kwargs): """ :param filename: filename to write :param mode: file open mode, i.e "w" or "a+" :param encoding: file encoding i.e "utf8" :param headers: csv headers in first row, if not set, automatically extract in first bulk of items :param filter_: run "transform --help" to see command line interface explanation for detail :param expand: run "transform --help" to see command line interface explanation for detail :param qsn: run "transform --help" to see command line interface explanation for detail :param kwargs: Example: ... csv_config = WCSVConfig("./result.csv", encoding="utf8", headers=["likeCount", "id", "title"]) with ProcessFactory.create_writer(csv_config) as csv_writer: async for items in es_getter: # do whatever you want with items csv_writer.write(items) """ super().__init__() self.filename = filename self.encoding = encoding self.mode = mode self.headers = headers self.filter = filter_ self.expand = expand self.qsn = qsn class WESConfig(BaseWriterConfig): def __init__(self, indices, doc_type, filter_=None, expand=None, id_hash_func=DefaultVal.default_id_hash_func, appCode=None, actions=None, createDate=None, error_if_fail=True, timeout=None, max_retry=None, random_min_sleep=None, random_max_sleep=None, auto_insert_createDate=True, hosts=None, headers=None, **kwargs): """ :param indices: elasticsearch indices :param doc_type: elasticsearch doc_type :param filter_: run "transform --help" to see command line interface explanation for detail :param expand: run "transform --help" to see command line interface explanation for detail :param id_hash_func: function to generate id_ for each item :param appCode: if not None, add appCode to each item before write to es :param actions: if not None, will set actions to user define actions, else default actions is 'index' :param createDate: if not None, add createDate to each item before write to es :param error_if_fail: if True, log to error if fail to insert to es, else log nothing :param timeout: http connection timeout when connect to es, seconds :param max_retry: if request fail, retry max_retry times :param random_min_sleep: if request fail, random sleep at least random_min_sleep seconds before request again :param random_max_sleep: if request fail, random sleep at most random_min_sleep seconds before request again :param auto_insert_createDate: whether insert createDate for each item automatic -> boolean :param hosts: elasticsearch hosts, list type, i.e: ["localhost:8888", "127.0.0.2:8889"] :param headers: headers when perform http requests to elasticsearch, dict type, i.e: {"Host": "aaa", "apikey": "bbb"} :param kwargs: Example: ... es_config = WESConfig("post20170630", "news") with ProcessFactory.create_writer(es_config) as es_writer: # asyncio function must call with await await csv_writer.write(items) """ super().__init__() if not random_min_sleep: random_min_sleep = DefaultVal.random_min_sleep if not random_max_sleep: random_max_sleep = DefaultVal.random_max_sleep if not max_retry: max_retry = DefaultVal.max_retry if not DefaultVal.main_config.has_es_configured: raise ValueError("You must config es_hosts before using Elasticsearch, Please edit configure file: %s" % (DefaultVal.main_config.ini_path, )) self.indices = indices self.doc_type = doc_type self.filter = filter_ self.expand = expand self.id_hash_func = id_hash_func self.es_client = get_es_client(hosts=hosts, headers=headers) self.app_code = appCode self.actions = actions self.create_date = createDate self.error_if_fail = error_if_fail self.timeout = timeout self.max_retry = max_retry self.random_min_sleep = random_min_sleep self.random_max_sleep = random_max_sleep self.auto_insert_createDate = auto_insert_createDate class WJsonConfig(BaseWriterConfig): def __init__(self, filename, mode=DefaultVal.default_file_mode_w, encoding=DefaultVal.default_encoding, expand=None, filter_=None, new_line=DefaultVal.new_line, **kwargs): """ :param filename: filename to write :param mode: file open mode, i.e "w" or "a+" :param encoding: file encoding i.e "utf8" :param expand: run "transform --help" to see command line interface explanation for detail :param filter_: run "transform --help" to see command line interface explanation for detail :param new_line: new_line seperator for each item, default is "\n" :param kwargs: Example: ... json_config = WJsonConfig("./result.json") with ProcessFactory.create_writer(csv_config) as json_writer: async for items in es_getter: json_writer.write(items) """ super().__init__() self.filename = filename self.mode = mode self.encoding = encoding self.expand = expand self.filter = filter_ self.new_line = new_line class WTXTConfig(BaseWriterConfig): def __init__(self, filename, mode=DefaultVal.default_file_mode_w, encoding=DefaultVal.default_encoding, expand=None, filter_=None, new_line=DefaultVal.new_line, join_val=DefaultVal.join_val, **kwargs): """ :param filename: filename to write :param mode: file open mode, i.e "w" or "a+" :param encoding: file encoding i.e "utf8" :param expand: run "transform --help" to see command line interface explanation for detail :param filter_: run "transform --help" to see command line interface explanation for detail :param new_line: new_line seperator for each item, default is "\n" :param join_val: space seperator for each key in each item, default is " " :param kwargs: Example: ... txt_config = WTXTConfig("./result.txt") with ProcessFactory.create_writer(txt_config) as txt_writer: async for items in es_getter: txt_writer.write(items) """ super().__init__() self.filename = filename self.mode = mode self.encoding = encoding self.expand = expand self.filter = filter_ self.new_line = new_line self.join_val = join_val class WXLSXConfig(BaseWriterConfig): def __init__(self, filename, mode=DefaultVal.default_file_mode_w, title=DefaultVal.title, expand=None, filter_=None, headers=None, sheet_index=0, **kwargs): """ :param filename: filename to write :param mode: file open mode, i.e "w" or "a+" :param title: sheet title :param expand: run "transform --help" to see command line interface explanation for detail :param filter_: run "transform --help" to see command line interface explanation for detail :param headers: xlsx headers in first row, if not set, automatically extract in first bulk of items :param sheet_index: which sheet to get, 0 means 0th sheet, only work for append mode :param kwargs: Example: ... xlsx_config = WXLSXConfig("./result.xlsx") with ProcessFactory.create_writer(xlsx_config) as xlsx_writer: async for items in es_getter: xlsx_writer.write(items) """ super().__init__() self.filename = filename self.mode = mode self.title = title self.expand = expand self.filter = filter_ self.headers = headers self.sheet_index = sheet_index class WRedisConfig(BaseWriterConfig): def __init__(self, key, key_type="LIST", filter_=None, host=None, port=None, db=None, password=None, timeout=None, encoding=None, direction=None, max_retry=None, random_min_sleep=None, random_max_sleep=None, compress=None, **kwargs): """ :param key: redis key to write data :param key_type: redis data type to operate, current only support LIST, HASH :param filter_: run "transform --help" to see command line interface explanation for detail :param host: redis host -> str :param port: redis port -> int :param db: redis database number -> int :param password: redis password -> int :param timeout: timeout per redis connection -> float :param encoding: redis object encoding -> str :param direction: "L" or "R", lpush or rpush :param compress: whether compress data use zlib before write to redis -> boolean :param kwargs: Example: redis_config = WRedisConfig("my_key") with ProcessFactory.create_writer(redis_config) as redis_writer: async for items in es_getter: await redis_writer.write(items) """ super().__init__() # load default value if not random_min_sleep: random_min_sleep = DefaultVal.random_min_sleep if not random_max_sleep: random_max_sleep = DefaultVal.random_max_sleep if not max_retry: max_retry = DefaultVal.max_retry if host is None: host = DefaultVal.redis_host if port is None: port = DefaultVal.redis_port if db is None: db = DefaultVal.redis_db if password is None: password = DefaultVal.redis_password if timeout is None: timeout = DefaultVal.redis_timeout if encoding is None: encoding = DefaultVal.redis_encoding if direction is None: direction = DefaultVal.redis_direction if compress is None: compress = DefaultVal.redis_compress # check value if not DefaultVal.main_config.has_redis_configured and port <= 0: raise ValueError("You must config redis before using Redis, Please edit configure file: %s" % (DefaultVal.main_config.ini_path, )) if key_type not in ("LIST", "HASH"): raise ValueError("key_type must be one of (%s)" % (str(("LIST", )), )) if not encoding: raise ValueError("You must specific encoding, since I am going to load each object in json format, " "and treat it as dictionary in python") if not password: password = None self.redis_pool_cli = None self.key = key self.host = host self.port = port self.db = db self.password = password self.encoding = encoding self.timeout = timeout self.key_type = key_type self.filter = filter_ self.name = "%s_%s->%s" % (str(host), str(port), str(key)) self.redis_write_method = None self.direction = direction self.max_retry = max_retry self.random_min_sleep = random_min_sleep self.random_max_sleep = random_max_sleep self.compress = compress if key_type == "LIST": self.is_range = True else: self.is_range = False async def get_redis_pool_cli(self): """ :return: an async redis client """ if self.redis_pool_cli is None: kwargs = { "db": self.db, "password": self.password, "encoding": self.encoding, "timeout": self.timeout, "minsize": 1, "maxsize": 3 } if self.compress: del kwargs["encoding"] self.redis_pool_cli = await aioredis.create_redis_pool((self.host, self.port), **kwargs) if self.key_type == "LIST": if self.direction == "L": self.redis_write_method = self.redis_pool_cli.lpush else: self.redis_write_method = self.redis_pool_cli.rpush else: self.redis_write_method = self.redis_pool_cli.hset return self.redis_pool_cli class WMySQLConfig(BaseWriterConfig): def __init__(self, table, filter_=None, max_retry=None, random_min_sleep=None, random_max_sleep=None, host=None, port=None, user=None, password=None, database=None, encoding=None, loop=None, **kwargs): """ :param table: mysql table :param filter_: run "transform --help" to see command line interface explanation for detail :param max_retry: if request fail, retry max_retry times :param random_min_sleep: if request fail, random sleep at least random_min_sleep seconds before request again :param random_max_sleep: if request fail, random sleep at most random_min_sleep seconds before request again :param host: mysql host -> str :param port: mysql port -> int :param user: mysql user -> str :param password: mysql password -> str :param database: mysql database -> str :param charset: default utf8 -> str :param loop: async loop instance :param kwargs: Example: mysql_config = WMySQLConfig("my_table") mysql_writer = ProcessFactory.create_writer(mysql_config) async for items in redis_getter: await mysql_writer.write(items) """ super().__init__() if not random_min_sleep: random_min_sleep = DefaultVal.random_min_sleep if not random_max_sleep: random_max_sleep = DefaultVal.random_max_sleep if not max_retry: max_retry = DefaultVal.max_retry if not host: host = DefaultVal.mysql_host if not port: port = DefaultVal.mysql_port if not user: user = DefaultVal.mysql_user if not password: password = DefaultVal.mysql_password if not database: database = DefaultVal.mysql_database if not encoding: encoding = DefaultVal.mysql_encoding if not DefaultVal.main_config.has_mysql_configured and port <= 0: raise ValueError("You must config mysql before using MySQL, Please edit configure file: %s" % (DefaultVal.main_config.ini_path, )) if "aiomysql" not in globals(): raise ValueError("module mysql disabled, please reinstall " "requirements with python version higher than 3.5.3 to enable it") self.table = table self.database = database self.max_retry = max_retry self.random_min_sleep = random_min_sleep self.random_max_sleep = random_max_sleep self.filter = filter_ self.name = "%s->%s" % (self.database, self.table) self.host = host self.port = port self.user = user if not password: password = '' self.password = password self.database = database self.encoding = encoding if not loop: loop = asyncio.get_event_loop() self.loop = loop self.mysql_pool_cli = self.connection = self.cursor = None async def get_mysql_pool_cli(self): """ :return: an async mysql client """ if self.mysql_pool_cli is None: self.mysql_pool_cli = await aiomysql.create_pool(host=self.host, port=self.port, user=self.user, password=self.password, db=self.database, loop=self.loop, minsize=1, maxsize=3, charset=self.encoding) self.connection = await self.mysql_pool_cli.acquire() self.cursor = await self.connection.cursor() return self.mysql_pool_cli def free_resource(self): if self.mysql_pool_cli is not None: self.mysql_pool_cli.release(self.connection) self.mysql_pool_cli.close() self.loop.create_task(self.mysql_pool_cli.wait_closed()) self.mysql_pool_cli = self.connection = self.cursor = None class WMongoConfig(BaseWriterConfig): def __init__(self, collection, id_hash_func=DefaultVal.default_id_hash_func, max_retry=None, random_min_sleep=None, random_max_sleep=None, filter_=None, host=None, port=None, username=None, password=None, database=None, auto_insert_createDate=False, createDate=None, **kwargs): """ :param collection: collection name :param id_hash_func: function to generate id_ for each item, only if "_id" not in item will I use 'id_hash_func' to generate "_id" :param return_source: if set to True, will return [item , ..., itemN], item is the "_source" object if set to False, will return whatever elasticsearch return, i.e {"hits": {"total": ...}} :param max_retry: if request fail, retry max_retry times :param random_min_sleep: if request fail, random sleep at least random_min_sleep seconds before request again :param random_max_sleep: if request fail, random sleep at most random_min_sleep seconds before request again :param filter_: run "transform --help" to see command line interface explanation for detail :param host: mongodb host -> str :param port: mongodb port -> int :param user: mongodb user -> str :param password: mongodb password -> str :param database: mongodb database -> str :param createDate: if not None, add createDate to each item before write to mongodb :param auto_insert_createDate: whether insert createDate for each item automatic -> boolean :param kwargs: Example: data = [json_obj, json_obj, json_obj] mongo_config = WMongoConfig("my_coll") async with ProcessFactory.create_writer(mongo_config) as mongo_writer: await mongo_writer.write(data) """ super().__init__() if not random_min_sleep: random_min_sleep = DefaultVal.random_min_sleep if not random_max_sleep: random_max_sleep = DefaultVal.random_max_sleep if not max_retry: max_retry = DefaultVal.max_retry if not host: host = DefaultVal.mongo_host if not port: port = DefaultVal.mongo_port if not username: username = DefaultVal.mongo_username if not password: password = DefaultVal.mongo_password if not database: database = DefaultVal.mongo_database if not DefaultVal.main_config.has_mongo_configured: raise ValueError("You must config MongoDB before using MongoDB, Please edit configure file: %s" % (DefaultVal.main_config.ini_path, )) if "motor" not in globals(): raise ValueError("module motor disabled, please reinstall " "requirements in linux") self.collection = collection self.max_retry = max_retry self.random_min_sleep = random_min_sleep self.random_max_sleep = random_max_sleep self.filter = filter_ self.host = host self.port = port self.username = username self.password = password self.database = database self.name = "%s->%s" % (self.database, self.collection) self.id_hash_func = id_hash_func self.auto_insert_createDate = auto_insert_createDate self.createDate = createDate self.client = self.collection_cli = None def get_mongo_cli(self): if self.client is None: kwargs = { "host": self.host, "port": self.port } if self.username: self.client = motor.motor_asyncio.AsyncIOMotorClient( "mongodb://%s:%s@%s:%s/%s" % (self.username, self.password, kwargs["host"], str(kwargs["port"]), self.database)) else: self.client = motor.motor_asyncio.AsyncIOMotorClient(**kwargs) self.collection_cli = self.client[self.database][self.collection] return self.client PK`K<idataapi_transform/DataProcess/Config/ConfigUtil/__init__.pyPK pMDEnY<<6idataapi_transform/DataProcess/DataGetter/APIGetter.pyimport re import json import hashlib import random import logging import asyncio import inspect import traceback from .BaseGetter import BaseGetter from ..Config.ConfigUtil.GetterConfig import RAPIConfig from ..Config.ConfigUtil.AsyncHelper import AsyncGenerator from ..PersistentUtil.PersistentWriter import PersistentWriter headers = { "Accept-Encoding": "gzip", # "Connection": "close" } class SourceObject(object): def __init__(self, response, tag, source, error_url, post_body): """ When error occur :param response: error response body :param tag: tag user pass in :param source: source url user pass in :param error_url: current url elicit error :param post_body: HTTP post body """ self.response = response self.tag = tag self.source = source self.error_url = error_url self.post_body = post_body class APIGetter(BaseGetter): def __init__(self, config): super().__init__() self.config = config self.base_url = self.config.source self.retry_count = 0 self.responses = list() self.bad_responses = list() self.done = False self.page_token = "" self.miss_count = 0 self.total_count = 0 self.call_back = self.async_call_back = None if self.config.call_back is not None: if inspect.iscoroutinefunction(self.config.call_back): self.async_call_back = self.config.call_back else: self.call_back = self.config.call_back self.request_time = 0 self.method = "POST" if self.config.post_body else "GET" self.give_up = False def init_val(self): self.base_url = self.config.source self.retry_count = 0 self.responses = list() self.bad_responses = list() self.done = False self.page_token = "" self.miss_count = 0 self.total_count = 0 self.call_back = self.async_call_back = None self.request_time = 0 self.config.persistent_writer = None self.give_up = False def generate_sub_func(self): def sub_func(match): return match.group(1) + self.page_token + match.group(3) return sub_func def update_base_url(self, key="pageToken"): if self.base_url[-1] == "/": self.base_url = self.base_url[:-1] elif self.base_url[-1] == "?": self.base_url = self.base_url[:-1] key += "=" if key not in self.base_url: if "?" not in self.base_url: self.base_url = self.base_url + "?" + key + self.page_token else: self.base_url = self.base_url + "&" + key + self.page_token else: self.base_url = re.sub("(" + key + ")(.+?)($|&)", self.generate_sub_func(), self.base_url) def __aiter__(self): return self async def __anext__(self): if self.done: logging.info("get source done: %s, total get %d items, total filtered: %d items" % (self.config.source, self.total_count, self.miss_count)) if self.config.persistent_writer and (not self.give_up or self.config.persistent_to_disk_if_give_up): self.config.persistent_writer.add(self.config.source) self.init_val() raise StopAsyncIteration while True: result = None # for SourceObject try: resp = await self.config.session._request(self.method, self.base_url, headers=headers, data=self.config.post_body) text = await resp.text() # print(text) result = json.loads(text) if "data" not in result: if "retcode" not in result or result["retcode"] not in self.config.success_ret_code: raise ValueError("Bad retcode: %s" % (str(result["retcode"]) if "retcode" in result else str(result), )) except Exception as e: self.retry_count += 1 logging.error("retry: %d, %s: %s" % (self.retry_count, str(e), self.base_url)) await asyncio.sleep(random.uniform(self.config.random_min_sleep, self.config.random_max_sleep)) if self.retry_count < self.config.max_retry: continue else: # fail logging.error("Give up, After retry: %d times, Unable to get url: %s, total get %d items, " "total filtered: %d items, error: %s" % (self.config.max_retry, self.base_url, self.total_count, self.miss_count, str(traceback.format_exc()) if "Bad retcode" not in str(e) else str(e))) self.done = self.give_up = True if self.config.return_fail: self.bad_responses.append(SourceObject(result, self.config.tag, self.config.source, self.base_url, self.config.post_body)) return await self.clear_and_return() elif self.responses: return await self.clear_and_return() else: return await self.__anext__() self.request_time += 1 if "data" in result: # success self.retry_count = 0 origin_length = len(result["data"]) if self.config.filter: curr_response = [self.config.filter(i) for i in result["data"]] curr_response = [i for i in curr_response if i] self.miss_count += origin_length - len(curr_response) else: curr_response = result["data"] self.total_count += origin_length if self.config.exclude_filtered_to_max_limit else len(curr_response) self.responses.extend(curr_response) # trim_to_max_limit if self.config.trim_to_max_limit and self.config.max_limit and self.total_count > self.config.max_limit: need_trim_items = self.total_count - self.config.max_limit self.responses = self.responses[:-need_trim_items] logging.info("trim %d items to fit max_limit: %d" % (need_trim_items, self.config.max_limit)) self.total_count -= need_trim_items # check if done if self.config.done_if is not None and self.config.done_if(curr_response): self.done = True return await self.clear_and_return() # get next page if success, retry if fail if "pageToken" in result: if not result["pageToken"]: self.done = True if self.need_return(): return await self.clear_and_return() self.page_token = str(result["pageToken"]) self.update_base_url() elif "retcode" in result and result["retcode"] in self.config.success_ret_code: self.done = True if self.need_return(): return await self.clear_and_return() return await self.__anext__() else: self.retry_count += 1 if self.retry_count >= self.config.max_retry: logging.error("Give up, After retry: %d times, Unable to get url: %s, total get %d items, " "total filtered: %d items" % (self.config.max_retry, self.base_url, self.total_count, self.miss_count)) self.done = self.give_up = True if self.need_return(): return await self.clear_and_return() await asyncio.sleep(random.uniform(self.config.random_min_sleep, self.config.random_max_sleep)) return await self.__anext__() if self.config.max_limit and self.total_count >= self.config.max_limit: self.done = True return await self.clear_and_return() elif len(self.responses) >= self.config.per_limit: return await self.clear_and_return() elif self.done: # buffer has empty data, and done fetching return await self.__anext__() if self.request_time % self.config.report_interval == 0: logging.info("After request %d pages, current item count(%d) < per_limit(%d), latest request page: %s" % (self.request_time, len(self.responses), self.config.per_limit, self.base_url)) def __iter__(self): raise ValueError("APIGetter must be used with async generator, not normal generator") async def clear_and_return(self): self.request_time = 0 if self.config.return_fail: resp, bad_resp = self.responses, self.bad_responses self.responses, self.bad_responses = list(), list() if self.call_back is not None: r = self.call_back(resp, bad_resp) if inspect.iscoroutine(r): # bind function for coroutine self.async_call_back = self.call_back self.call_back = None return await r return r elif self.async_call_back is not None: return await self.async_call_back(resp, bad_resp) else: return resp, bad_resp else: resp = self.responses self.responses = list() if self.call_back is not None: r = self.call_back(resp) if inspect.iscoroutine(r): # bind function for coroutine self.async_call_back = self.call_back self.call_back = None return await r return r elif self.async_call_back is not None: return await self.async_call_back(resp) else: return resp def need_return(self): return self.responses or (self.config.return_fail and (self.responses or self.bad_responses)) class APIBulkGetter(BaseGetter): def __init__(self, config): super().__init__() self.config = config self.async_api_configs = AsyncGenerator(self.config.sources, self.to_config) self.pending_tasks = list() self.buffers = list() self.bad_buffers = list() self.success_task = 0 self.curr_size = 0 self.curr_bad_size = 0 self.persistent_writer = None self.skip_num = 0 def to_config(self, item): if isinstance(item, RAPIConfig): r = item else: r = RAPIConfig(item, session=self.config.session, filter_=self.config.filter, return_fail=self.config.return_fail, done_if=self.config.done_if, trim_to_max_limit=self.config.trim_to_max_limit, exclude_filtered_to_max_limit=self.config.exclude_filtered_to_max_limit, persistent_to_disk_if_give_up=self.config.persistent_to_disk_if_give_up) # persistent if self.config.persistent: if not self.config.persistent_key: self.config.persistent_key = hashlib.md5(r.source.encode("utf8")).hexdigest() if self.persistent_writer is None: self.persistent_writer = PersistentWriter(self.config.persistent_key) r.persistent_writer = self.persistent_writer return r async def fetch_items(self, api_config): if api_config.return_fail: async for items, bad_items in APIGetter(api_config): if self.config.return_fail: self.bad_buffers.extend(bad_items) self.buffers.extend(items) else: async for items in APIGetter(api_config): self.buffers.extend(items) async def fill_tasks(self): if len(self.pending_tasks) >= self.config.concurrency: return async for api_config in self.async_api_configs: # skip already done task if self.config.persistent: if api_config.source in self.persistent_writer: self.skip_num += 1 continue self.pending_tasks.append(self.fetch_items(api_config)) if len(self.pending_tasks) >= self.config.concurrency: self.persistent() return self.persistent() def __aiter__(self): return self async def __anext__(self): await self.fill_tasks() while self.pending_tasks: done, pending = await asyncio.wait(self.pending_tasks, timeout=self.config.interval) self.pending_tasks = list(pending) self.success_task += len(done) if self.buffers or (self.config.return_fail and (self.buffers or self.bad_buffers)): return self.clear_and_return() else: # after interval seconds, no item fetched await self.fill_tasks() log_str = "After %.2f seconds, no new item fetched, current done task: %d, pending tasks: %d" % (float(self.config.interval), self.success_task, len(self.pending_tasks)) if self.config.persistent: log_str += ", skip %d already finished tasks with persistent mode on" % (self.skip_num, ) logging.info(log_str) continue ret_log = "APIBulkGetter Done, total perform: %d tasks, fetch: %d items" % (self.success_task, self.curr_size) if self.config.return_fail: ret_log += ", fail: %d items" % (self.curr_bad_size, ) if self.config.persistent: ret_log += ", skip %d already finished tasks with persistent mode on" % (self.skip_num,) logging.info(ret_log) if self.config.persistent: self.persistent_writer.clear(self.config.persistent_start_fresh_if_done) raise StopAsyncIteration def __iter__(self): raise ValueError("APIBulkGetter must be used with async generator, not normal generator") def clear_and_return(self): if self.config.return_fail: buffers, bad_buffers = self.buffers, self.bad_buffers self.curr_size += len(self.buffers) self.curr_bad_size += len(self.bad_buffers) self.buffers, self.bad_buffers = list(), list() return buffers, bad_buffers else: buffers = self.buffers self.curr_size += len(self.buffers) self.buffers = list() return buffers def persistent(self): # persistent task to file if self.config.persistent: self.persistent_writer.write() # logging.info("persistent mode on, after sync, totally skip %d already finished tasks" % (self.skip_num,)) PKYKUYUU7idataapi_transform/DataProcess/DataGetter/BaseGetter.pyimport abc from ..Meta.BaseDataProcess import BaseDataProcess class BaseGetter(BaseDataProcess, metaclass=abc.ABCMeta): @abc.abstractmethod def __init__(self, *args, **kwargs): """ :param config config contains attribute: source: where to read data per_limit: return at most per_limit data each time max_limit: return at most max_limit data total """ pass @abc.abstractmethod def __aiter__(self): return self @abc.abstractmethod async def __anext__(self): pass PKCM2 2 6idataapi_transform/DataProcess/DataGetter/CSVGetter.pyimport csv import sys import logging from .BaseGetter import BaseGetter if sys.platform == "linux": csv.field_size_limit(sys.maxsize) class CSVGetter(BaseGetter): def __init__(self, config): super().__init__() self.config = config self.f_in = open(self.config.filename, self.config.mode, encoding=self.config.encoding) self.reader = csv.DictReader(self.f_in) self.done = False self.responses = list() self.miss_count = 0 self.total_count = 0 def init_val(self): self.done = False self.responses = list() self.f_in.seek(0, 0) self.miss_count = 0 self.total_count = 0 def __aiter__(self): return self async def __anext__(self): if self.done: logging.info("get source done: %s, total get %d items, total filtered: %d items" % (self.config.filename, self.total_count, self.miss_count)) self.init_val() raise StopAsyncIteration for row in self.reader: if self.config.max_limit and self.total_count > self.config.max_limit: self.done = True return self.clear_and_return() self.total_count += 1 if self.config.filter: row = self.config.filter(row) if not row: self.miss_count += 1 continue self.responses.append(row) if len(self.responses) > self.config.per_limit: return self.clear_and_return() if self.responses: self.done = True return self.clear_and_return() logging.info("get source done: %s, total get %d items, total filtered: %d items" % (self.config.filename, self.total_count, self.miss_count)) self.init_val() raise StopAsyncIteration def __iter__(self): for row in self.reader: if self.config.max_limit and self.total_count > self.config.max_limit: self.done = True yield self.clear_and_return() break self.total_count += 1 if self.config.filter: row = self.config.filter(row) if not row: self.miss_count += 1 continue self.responses.append(row) if len(self.responses) > self.config.per_limit: yield self.clear_and_return() if self.responses: yield self.responses logging.info("get source done: %s, total get %d items, total filtered: %d items" % (self.config.filename, self.total_count, self.miss_count)) self.init_val() def clear_and_return(self): resp = self.responses self.responses = list() return resp PKEV}M^mm5idataapi_transform/DataProcess/DataGetter/ESGetter.pyimport asyncio import random import logging import traceback from .BaseGetter import BaseGetter class ESScrollGetter(BaseGetter): def __init__(self, config): super().__init__(self) self.config = config self.es_client = config.es_client self.total_size = None self.result = None self.scroll_id = None self.miss_count = 0 self.total_count = 0 def __aiter__(self): return self def init_val(self): self.total_size = None self.result = None self.scroll_id = None self.miss_count = 0 self.total_count = 0 async def __anext__(self, retry=1): if self.total_size is None: self.result = await self.es_client.search(self.config.indices, self.config.doc_type, scroll=self.config.scroll, body=self.config.query_body) self.total_size = self.result['hits']['total'] self.total_size = self.config.max_limit if (self.config.max_limit and self.config.max_limit < self.result['hits']['total']) else self.total_size self.total_count += len(self.result['hits']['hits']) logging.info("Get %d items from %s, percentage: %.2f%%" % (len(self.result['hits']['hits']), self.config.indices + "->" + self.config.doc_type, (self.total_count / self.total_size * 100) if self.total_size else 0)) origin_length = len(self.result['hits']['hits']) if self.config.return_source: results = [i["_source"] for i in self.result['hits']['hits']] else: results = self.result if self.config.filter: results = [self.config.filter(i) for i in results] results = [i for i in results if i] self.miss_count += origin_length - len(results) self.get_score_id_and_clear_result() return results if self.scroll_id and self.total_count < self.total_size: try: self.result = await self.es_client.scroll(scroll_id=self.scroll_id, scroll=self.config.scroll) except Exception as e: if retry < self.config.max_retry: logging.error("retry: %d, %s" % (retry, str(e))) await asyncio.sleep(random.uniform(self.config.random_min_sleep, self.config.random_max_sleep)) return await self.__anext__(retry+1) else: logging.error("Give up es getter, After retry: %d times, still fail to get result: %s, " "total get %d items, total filtered: %d items, reason: %s" % (self.config.max_retry, self.config.indices + "->" + self.config.doc_type, self.total_count, self.miss_count, traceback.format_exc())) raise StopAsyncIteration logging.info("Get %d items from %s, filtered: %d items, percentage: %.2f%%" % (len(self.result['hits']['hits']), self.config.indices + "->" + self.config.doc_type, self.miss_count, (self.total_count / self.total_size * 100) if self.total_size else 0)) origin_length = len(self.result['hits']['hits']) self.total_count += origin_length if self.config.return_source: results = [i["_source"] for i in self.result['hits']['hits']] else: results = self.result if self.config.filter: results = [self.config.filter(i) for i in results] results = [i for i in results if i] self.miss_count += origin_length - len(results) self.get_score_id_and_clear_result() if origin_length > 0: return results else: # if scroll empty item, means no more next page logging.info("empty result, terminating scroll, scroll id: %s" % (str(self.scroll_id), )) logging.info("get source done: %s, total get %d items, total filtered: %d items" % (self.config.indices + "->" + self.config.doc_type, self.total_count, self.miss_count)) self.init_val() raise StopAsyncIteration async def delete_all(self): """ inefficient delete """ body = { "query": { "match_all": {} } } result = await self.config.es_client.delete_by_query(index=self.config.indices, doc_type=self.config.doc_type, body=body, params={"conflicts": "proceed"}) return result def __iter__(self): raise ValueError("ESGetter must be used with async generator, not normal generator") def get_score_id_and_clear_result(self): if "_scroll_id" in self.result and self.result["_scroll_id"]: self.scroll_id = self.result["_scroll_id"] else: self.scroll_id = None self.result = dict() PKCM?S4 7idataapi_transform/DataProcess/DataGetter/JsonGetter.pyimport json import logging from .BaseGetter import BaseGetter class JsonGetter(BaseGetter): def __init__(self, config): super().__init__(self) self.config = config self.responses = list() self.done = False self.f_in = open(self.config.filename, self.config.mode, encoding=self.config.encoding) self.miss_count = 0 self.total_count = 0 def init_val(self): self.responses = list() self.done = False self.f_in.seek(0, 0) self.miss_count = 0 self.total_count = 0 def __aiter__(self): return self async def __anext__(self): if self.done: logging.info("get source done: %s, total get %d items, total filtered: %d items" % (self.config.filename, self.total_count, self.miss_count)) self.init_val() raise StopAsyncIteration for line in self.f_in: if self.config.max_limit and self.total_count > self.config.max_limit: self.done = True return self.clear_and_return() self.total_count += 1 try: json_obj = json.loads(line) except json.decoder.JSONDecodeError: logging.error("JSONDecodeError. give up. line: %d" % (self.total_count, )) continue if self.config.filter: json_obj = self.config.filter(json_obj) if not json_obj: self.miss_count += 1 continue self.responses.append(json_obj) if len(self.responses) > self.config.per_limit: return self.clear_and_return() self.done = True if self.responses: return self.clear_and_return() logging.info("get source done: %s, total get %d items, total filtered: %d items" % (self.config.filename, self.total_count, self.miss_count)) self.init_val() raise StopAsyncIteration def __iter__(self): for line in self.f_in: if self.config.max_limit and self.total_count > self.config.max_limit: self.done = True yield self.clear_and_return() break self.total_count += 1 try: json_obj = json.loads(line) except json.decoder.JSONDecodeError: logging.error("JSONDecodeError. give up. line: %d" % (self.total_count, )) continue if self.config.filter: json_obj = self.config.filter(json_obj) if not json_obj: self.miss_count += 1 continue self.responses.append(json_obj) if len(self.responses) > self.config.per_limit: yield self.clear_and_return() if self.responses: yield self.clear_and_return() logging.info("get source done: %s, total get %d items, total filtered: %d items" % (self.config.filename, self.total_count, self.miss_count)) self.init_val() def __del__(self): self.f_in.close() def clear_and_return(self): resp = self.responses self.responses = list() return resp PKEV}MCoL8idataapi_transform/DataProcess/DataGetter/MongoGetter.pyimport asyncio import traceback import random import logging from .BaseGetter import BaseGetter class MongoGetter(BaseGetter): def __init__(self, config): super().__init__(self) self.config = config self.responses = list() self.miss_count = 0 self.total_count = 0 self.total_size = None self.need_finish = False def init_val(self): self.responses = list() self.miss_count = 0 self.total_count = 0 self.total_size = None self.need_finish = False def __aiter__(self): return self async def __anext__(self): self.config.get_mongo_cli() # init mongo pool if self.need_finish: await self.finish() if self.total_size is None: self.total_size = await self.get_total_size() if self.total_count < self.total_size: await self.fetch_per_limit() return self.clear_and_return() # reach here, means done await self.finish() def __iter__(self): raise ValueError("MongoGetter must be used with async generator, not normal generator") async def finish(self): logging.info("get source done: %s, total get %d items, total filtered: %d items" % (self.config.name, self.total_count, self.miss_count)) self.init_val() raise StopAsyncIteration async def get_total_size(self): if hasattr(self.config.cursor, "count"): size = await self.config.cursor.count() else: size = await self.config.client[self.config.database][self.config.collection].count_documents({} if not self.config.query_body else self.config.query_body) size = min(size, self.config.max_limit if self.config.max_limit is not None else size) if size == 0: await self.finish() return size async def fetch_per_limit(self): curr_size = 0 try_time = 0 get_all = True while try_time < self.config.max_retry: try: async for document in self.config.cursor: curr_size += 1 self.responses.append(document) if curr_size >= self.config.per_limit: get_all = False break if get_all: # get all item if self.total_count + curr_size < self.total_size: logging.error("get all items: %d, but not reach 'total_size': %d" % (self.total_count + curr_size, self.total_size)) self.need_finish = True break except Exception as e: try_time += 1 if try_time < self.config.max_retry: logging.error("retry: %d, %s" % (try_time, str(e))) await asyncio.sleep(random.uniform(self.config.random_min_sleep, self.config.random_max_sleep)) else: logging.error("Give up MongoGetter getter: %s, After retry: %d times, still fail, " "total get %d items, total filtered: %d items, reason: %s" % (self.config.name, self.config.max_retry, self.total_count, self.miss_count, str(traceback.format_exc()))) self.need_finish = True self.total_count += len(self.responses) curr_miss_count = 0 if self.config.filter: target_results = list() for each in self.responses: each = self.config.filter(each) if each: target_results.append(each) else: curr_miss_count += 1 self.responses = target_results self.miss_count += curr_miss_count logging.info("Get %d items from %s, filtered: %d items, percentage: %.2f%%" % (len(self.responses), self.config.name, curr_miss_count, (self.total_count / self.total_size * 100) if self.total_size else 0)) def clear_and_return(self): resp = self.responses self.responses = list() return resp PKEV}M-8idataapi_transform/DataProcess/DataGetter/MySQLGetter.pyimport json import asyncio import traceback import random import logging from .BaseGetter import BaseGetter class MySQLGetter(BaseGetter): def __init__(self, config): super().__init__(self) self.config = config self.responses = list() self.miss_count = 0 self.total_count = 0 self.total_size = None self.key_fields = list() self.key_fields_map = dict() self.need_finish = False def init_val(self): self.responses = list() self.miss_count = 0 self.total_count = 0 self.total_size = None self.key_fields = list() self.key_fields_map = dict() self.need_finish = False def __aiter__(self): return self async def __anext__(self): await self.config.get_mysql_pool_cli() # init mysql pool if self.need_finish: await self.finish() if self.total_size is None: self.total_size, self.key_fields = await self.get_total_size_and_key_field() if self.total_count < self.total_size: await self.fetch_per_limit() return self.clear_and_return() # reach here, means done await self.finish() def __iter__(self): raise ValueError("MySQLGetter must be used with async generator, not normal generator") async def finish(self): logging.info("get source done: %s, total get %d items, total filtered: %d items" % (self.config.name, self.total_count, self.miss_count)) self.init_val() self.config.free_resource() raise StopAsyncIteration async def get_total_size_and_key_field(self): await self.config.cursor.execute("DESC %s" % (self.config.table, )) result = await self.config.cursor.fetchall() field = result[0][0] await self.config.cursor.execute("select count(%s) from %s" % (field, self.config.table)) result = await self.config.cursor.fetchone() # key field await self.config.cursor.execute("DESC %s" % (self.config.table, )) results = await self.config.cursor.fetchall() key_fields = list() for each in results: key_fields.append(each[0]) if "tinyint" in each[1]: self.key_fields_map[each[0]] = bool elif "text" in each[1]: self.key_fields_map[each[0]] = str # or json key_fields = list(i[0] for i in results) return result[0], key_fields async def fetch_per_limit(self): results = list() try_time = 0 while try_time < self.config.max_retry: try: await self.config.cursor.execute("SELECT * FROM %s LIMIT %d,%d" % (self.config.table, self.total_count, self.config.per_limit)) results = await self.config.cursor.fetchall() break except Exception as e: try_time += 1 if try_time < self.config.max_retry: logging.error("retry: %d, %s" % (try_time, str(e))) await asyncio.sleep(random.uniform(self.config.random_min_sleep, self.config.random_max_sleep)) else: logging.error("Give up MySQL getter: %s, After retry: %d times, still fail, " "total get %d items, total filtered: %d items, reason: %s" % (self.config.name, self.config.max_retry, self.total_count, self.miss_count, str(traceback.format_exc()))) self.need_finish = True self.responses = [self.decode(i) for i in results] curr_miss_count = 0 if self.config.filter: target_results = list() for each in results: each = self.config.filter(each) if each: target_results.append(each) else: curr_miss_count += 1 self.responses = target_results self.miss_count += curr_miss_count self.total_count += len(results) logging.info("Get %d items from %s, filtered: %d items, percentage: %.2f%%" % (len(results), self.config.name, curr_miss_count, (self.total_count / self.total_size * 100) if self.total_size else 0)) if self.total_count >= self.config.max_limit: self.need_finish = True return def decode(self, item): """ :param item: tuple :return: dict """ ret_dict = dict() index = 0 for key in self.key_fields: if key in self.key_fields_map: if self.key_fields_map[key] is bool: ret_dict[key] = bool(item[index]) elif item[index] is None: ret_dict[key] = None elif item[index][0] in ("{", "["): try: val = json.loads(item[index]) except json.decoder.JSONDecodeError: val = item[index] ret_dict[key] = val else: ret_dict[key] = item[index] else: ret_dict[key] = item[index] index += 1 return ret_dict def clear_and_return(self): resp = self.responses self.responses = list() return resp PKEV}MA נ8idataapi_transform/DataProcess/DataGetter/RedisGetter.pyimport asyncio import random import logging import traceback import json import zlib from .BaseGetter import BaseGetter class RedisGetter(BaseGetter): def __init__(self, config): super().__init__(self) self.config = config self.is_range = self.config.is_range self.need_del = self.config.need_del self.responses = list() self.done = False self.total_size = None self.miss_count = 0 self.total_count = 0 self.redis_object_length = 0 def init_val(self): self.responses = list() self.done = False self.miss_count = 0 self.total_count = 0 self.redis_object_length = 0 self.total_size = None def decode(self, loaded_object): if self.config.compress: return zlib.decompress(loaded_object).decode(self.config.encoding) else: return json.loads(loaded_object) def __aiter__(self): return self async def __anext__(self, retry=1): await self.config.get_redis_pool_cli() # init redis pool if self.is_range and self.total_size is None: self.redis_object_length = await self.config.redis_len_method(self.config.key) self.total_size = self.config.max_limit if (self.config.max_limit and self.config.max_limit < self.redis_object_length) else self.redis_object_length if self.done: logging.info("get source done: %s, total get %d items, total filtered: %d items" % (self.config.name, self.total_count, self.miss_count)) self.init_val() raise StopAsyncIteration if self.is_range: if self.config.direction == "L": left = self.total_count right = self.total_count + self.config.per_limit - 1 else: left = self.total_size - self.config.per_limit - 1 if left < 0: left = 0 right = left + self.config.per_limit try: self.responses = await self.config.redis_read_method(self.config.key, left, right) self.responses = [self.decode(i) for i in self.responses] except Exception as e: if retry < self.config.max_retry: logging.error("retry: %d, %s" % (retry, str(e))) await asyncio.sleep(random.randint(self.config.random_min_sleep, self.config.random_max_sleep)) return await self.__anext__(retry+1) else: logging.error("Give up redis getter, After retry: %d times, still fail to get key: %s, " "total get %d items, total filtered: %d items, error: %s" % (self.config.max_retry, self.config.key, self.total_count, self.miss_count, str(traceback.format_exc()))) raise StopAsyncIteration if len(self.responses) < self.config.per_limit or not self.responses or self.total_count + len(self.responses) >= self.total_size: self.done = True if self.need_del: await self.config.redis_del_method(self.config.key, 0, -1) else: try: self.responses = await self.config.redis_read_method(self.config.key) self.responses = [self.decode(i) for i in self.responses.values()][:self.total_size] except Exception as e: if retry < self.config.max_retry: logging.error("retry: %d, %s" % (retry, str(e))) await asyncio.sleep(random.uniform(self.config.random_min_sleep, self.config.random_max_sleep)) return await self.__anext__(retry+1) else: logging.error("Give up redis getter, After retry: %d times, still fail to get key: %s, " "total get %d items, total filtered: %d items, reason: %s" % (self.config.max_retry, self.config.key, self.total_count, self.miss_count, str(traceback.format_exc()))) raise StopAsyncIteration if self.config.max_limit: self.responses = self.responses[:self.config.max_limit] self.done = True if self.need_del: await self.config.redis_del_method(self.config.key) current_response_length = len(self.responses) curr_miss_count = 0 self.total_count += current_response_length if self.config.filter: target_responses = list() for i in self.responses: if self.config.filter: i = self.config.filter(i) if i: target_responses.append(i) else: curr_miss_count += 1 self.responses = target_responses self.miss_count += curr_miss_count if self.is_range: logging.info("Get %d items from %s, filtered: %d items, percentage: %.2f%%" % (current_response_length, self.config.name, curr_miss_count, (self.total_count / self.total_size * 100) if self.total_size else 0)) return self.clear_and_return() def __iter__(self): raise ValueError("RedisGetter must be used with async generator, not normal generator") def clear_and_return(self): resp = self.responses self.responses = list() return resp PK)cLl<<7idataapi_transform/DataProcess/DataGetter/XLSXGetter.pyimport logging from openpyxl import load_workbook from .BaseGetter import BaseGetter class XLSXGetter(BaseGetter): def __init__(self, config): super().__init__() self.config = config self.wb = load_workbook(filename=self.config.filename, read_only=True) if not self.wb.worksheets: raise ValueError("Empty file: %s" % (self.config.filename, )) self.sheet = self.wb.worksheets[self.config.sheet_index] self.row_iter = self.sheet.rows self.headers = self.generate_headers() self.max_row = self.sheet.max_row if self.config.max_limit and self.config.max_limit > self.max_row: self.max_row = self.config.max_limit + 1 # add first headers self.row_num = 0 self.responses = list() self.curr_size = 0 self.done = False self.miss_count = 0 self.total_count = 0 def init_val(self): self.row_num = 0 self.responses = list() self.curr_size = 0 self.done = False self.miss_count = 0 self.total_count = 0 self.row_iter = self.sheet.rows def __aiter__(self): return self async def __anext__(self): if self.done: logging.info("get source done: %s, total get %d items, total filtered: %d items" % (self.config.filename, self.total_count, self.miss_count)) self.init_val() raise StopAsyncIteration while self.row_num < self.max_row: if self.row_num == 0: self.row_num += 1 continue self.row_num += 1 self.total_count += 1 row = self.get_next_row() if self.config.filter: row = self.config.filter(row) if not row: self.miss_count += 1 continue self.responses.append(row) if len(self.responses) > self.config.per_limit: self.curr_size += len(self.responses) return self.clear_and_return() if self.responses: self.done = True return self.clear_and_return() logging.info("get source done: %s, total get %d items, total filtered: %d items" % (self.config.filename, self.total_count, self.miss_count)) self.init_val() raise StopAsyncIteration def generate_headers(self): keys = list() try: row = next(self.row_iter) for each in row: keys.append(each.value) except StopIteration: pass return keys def get_next_row(self): ret_item = dict() r = next(self.row_iter) for key, cell in zip(self.headers, r): ret_item[key] = cell.value return ret_item def __iter__(self): for row_num in range(self.max_row): if row_num == 0: continue row_num += 1 self.total_count += 1 row = self.get_next_row() if self.config.filter: row = self.config.filter(row) if not row: self.miss_count += 1 continue self.responses.append(row) if len(self.responses) > self.config.per_limit: self.curr_size += len(self.responses) yield self.clear_and_return() if self.responses: yield self.responses logging.info("get source done: %s, total get %d items, total filtered: %d items" % (self.config.filename, self.total_count, self.miss_count)) self.init_val() def __del__(self): self.wb.close() def clear_and_return(self): resp = self.responses self.responses = list() return resp PK2K5idataapi_transform/DataProcess/DataGetter/__init__.pyPK{KV7idataapi_transform/DataProcess/DataWriter/BaseWriter.pyimport abc from ..Meta.BaseDataProcess import BaseDataProcess class BaseWriter(BaseDataProcess, metaclass=abc.ABCMeta): @abc.abstractmethod def __init__(self, *args, **kwargs): """ :param config """ pass @abc.abstractmethod async def write(self, responses): pass @abc.abstractmethod def __enter__(self): pass @abc.abstractmethod def __exit__(self, exc_type, exc_val, exc_tb): pass PKVM'6idataapi_transform/DataProcess/DataWriter/CSVWriter.pyimport os import csv import types import logging from .BaseWriter import BaseWriter class CSVWriter(BaseWriter): def __init__(self, config): super().__init__() self.config = config self.file_already_exists = os.path.exists(self.config.filename) and os.path.getsize(self.config.filename) self.f_out = open(self.config.filename, self.config.mode, encoding=self.config.encoding, newline="") self.f_csv = None self.headers = dict() if not self.config.headers else self.config.headers self.total_miss_count = 0 self.success_count = 0 def write(self, responses): miss_count = 0 # filter if self.config.filter: new_result = list() for each_response in responses: each_response = self.config.filter(each_response) if not each_response: miss_count += 1 continue new_result.append(each_response) responses = new_result self.total_miss_count += miss_count # all filtered if not responses: logging.info("%s write 0 item, filtered %d item" % (self.config.filename, miss_count)) return # expand if self.config.expand: responses = [self.expand_dict(i, max_expand=self.config.expand) for i in responses] else: responses = [i for i in responses] if isinstance(responses, types.GeneratorType) else responses # headers if not self.f_csv: if "a" in self.config.mode and self.file_already_exists: self.headers = self.generate_headers(responses, append_mode=True) self.f_csv = csv.DictWriter(self.f_out, self.headers) else: if not self.headers: self.headers = self.generate_headers(responses) self.f_csv = csv.DictWriter(self.f_out, self.headers) self.f_csv.writeheader() # encoding process for each_response in responses: for k, v in each_response.items(): if v is None: each_response[k] = "" elif self.config.qsn and v != "" and (isinstance(v, (int, float)) or isinstance(v, str) and all(i.isdigit() for i in v)): each_response[k] = repr(str(v)) elif self.config.encoding not in ("utf8", "utf-8"): each_response[k] = str(v).encode(self.config.encoding, "ignore").decode(self.config.encoding) self.success_count += 1 self.f_csv.writerow(each_response) logging.info("%s write %d item, filtered %d item" % (self.config.filename, len(responses), miss_count)) def generate_headers(self, responses, append_mode=False): headers = set() for r in responses: for key in r.keys(): headers.add(key) if append_mode: f_in = open(self.config.filename, "r", encoding=self.config.encoding, newline="") reader = csv.DictReader(f_in) exists_fields = reader.fieldnames if set(exists_fields) != headers: raise ValueError("append mode for csv file: %s, but header field mismatch, exist fields: %s, generated fields: %s" % (self.config.filename, repr(exists_fields), repr(headers))) return exists_fields return list(headers) def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.f_out.close() logging.info("%s write done, total filtered %d item, total write %d item" % (self.config.filename, self.total_miss_count, self.success_count)) PKEV}M/x 5idataapi_transform/DataProcess/DataWriter/ESWriter.pyimport logging import asyncio import random from .BaseWriter import BaseWriter from ..Config.MainConfig import main_config class ESWriter(BaseWriter): def __init__(self, config): if not main_config.has_es_configured: raise ValueError("You must config es_hosts before using ESWriter, Please edit configure file: %s" % (main_config.ini_path, )) super().__init__() self.config = config self.total_miss_count = 0 self.success_count = 0 self.fail_count = 0 async def write(self, responses): response = None # something to return origin_length = len(responses) if self.config.filter: responses = [self.config.filter(i) for i in responses] responses = [i for i in responses if i] miss_count = origin_length - len(responses) self.total_miss_count += miss_count if responses: if self.config.expand: responses = [self.expand_dict(i) for i in responses] try_time = 0 while try_time < self.config.max_retry: success, fail, response = await self.config.es_client.add_dict_to_es( self.config.indices, self.config.doc_type, responses, self.config.id_hash_func, self.config.app_code, self.config.actions, self.config.create_date, self.config.error_if_fail, self.config.timeout, self.config.auto_insert_createDate) if response is not None: self.success_count += success self.fail_count += fail logging.info("Write %d items to index: %s, doc_type: %s, fail: %d, filtered: %d" % ( len(responses), self.config.indices, self.config.doc_type, fail, miss_count)) break else: # exception happened try_time += 1 if try_time >= self.config.max_retry: logging.error("Fail to write after try: %d times, Write 0 items to index: %s, doc_type: %s" % (self.config.max_retry, self.config.indices, self.config.doc_type)) else: await asyncio.sleep(random.uniform(self.config.random_min_sleep, self.config.random_max_sleep)) else: # all filtered, or pass empty result logging.info("Write 0 items to index: %s, doc_type: %s (all filtered, or pass empty result)" % (self.config.indices, self.config.doc_type)) return response async def delete_all(self, body=None): """ inefficient delete """ if not body: body = { "query": { "match_all": {} } } result = await self.config.es_client.delete_by_query(index=self.config.indices, doc_type=self.config.doc_type, body=body, params={"conflicts": "proceed"}) return result def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): logging.info("%s->%s write done, total filtered %d item, total write %d item, total fail: %d item" % (self.config.indices, self.config.doc_type, self.total_miss_count, self.success_count, self.fail_count)) PKU+L ??7idataapi_transform/DataProcess/DataWriter/JsonWriter.pyimport json import logging from .BaseWriter import BaseWriter class JsonWriter(BaseWriter): def __init__(self, config): super().__init__() self.config = config self.total_miss_count = 0 self.success_count = 0 self.f_out = open(self.config.filename, self.config.mode, encoding=self.config.encoding) def write(self, responses): miss_count = 0 for each_response in responses: if self.config.expand: each_response = self.expand_dict(each_response, max_expand=self.config.expand) if self.config.filter: each_response = self.config.filter(each_response) if not each_response: miss_count += 1 continue self.f_out.write(json.dumps(each_response) + self.config.new_line) self.success_count += 1 self.total_miss_count += miss_count logging.info("%s write %d item, filtered %d item" % (self.config.filename, len(responses), miss_count)) def __exit__(self, exc_type, exc_val, exc_tb): self.f_out.close() logging.info("%s write done, total filtered %d item, total write %d item" % (self.config.filename, self.total_miss_count, self.success_count)) def __enter__(self): return self PKEV}MH %k k 8idataapi_transform/DataProcess/DataWriter/MongoWriter.pyimport json import asyncio import random import logging import traceback from .BaseWriter import BaseWriter InsertOne = DeleteMany = ReplaceOne = UpdateOne = None try: from pymongo import InsertOne, DeleteMany, ReplaceOne, UpdateOne except Exception: pass class MongoWriter(BaseWriter): def __init__(self, config): super().__init__() self.config = config self.total_miss_count = 0 self.success_count = 0 self.table_checked = False self.key_fields = list() async def write(self, responses): self.config.get_mongo_cli() # init mysql pool miss_count = 0 original_length = len(responses) if self.config.filter: target_responses = list() for i in responses: i = self.config.filter(i) if i: target_responses.append(i) else: miss_count += 1 responses = target_responses if not responses: self.finish_once(miss_count, original_length) return # After filtered, still have responses to write if await self.perform_write(responses): self.finish_once(miss_count, original_length) def __exit__(self, exc_type, exc_val, exc_tb): logging.info("%s write done, total filtered %d item, total write %d item" % (self.config.name, self.total_miss_count, self.success_count)) def __enter__(self): return self def finish_once(self, miss_count, original_length): self.total_miss_count += miss_count self.success_count += original_length logging.info("%s write %d item, filtered %d item" % (self.config.name, original_length - miss_count, miss_count)) async def perform_write(self, responses): try_time = 0 for each in responses: if self.config.auto_insert_createDate and self.config.createDate is not None: each["createDate"] = self.config.createDate if "_id" not in each: each["_id"] = self.config.id_hash_func(each) while try_time < self.config.max_retry: try: if UpdateOne is not None: await self.config.collection_cli.bulk_write([UpdateOne({'_id': each["_id"]}, {"$set": each}, upsert=True) for each in responses]) else: bulk = self.config.collection_cli.initialize_ordered_bulk_op() for each in responses: bulk.find({"_id": each["_id"]}).upsert().replace_one(each) await bulk.execute() return True except Exception as e: try_time += 1 if try_time < self.config.max_retry: logging.error("retry: %d, %s" % (try_time, str(e))) await asyncio.sleep(random.uniform(self.config.random_min_sleep, self.config.random_max_sleep)) else: logging.error("Give up MongoWriter writer: %s, After retry: %d times, still fail to write, " "total write %d items, total filtered: %d items, reason: %s" % (self.config.name, self.config.max_retry, self.success_count, self.total_miss_count, str(traceback.format_exc()))) return False PKEV}M5Z,O8idataapi_transform/DataProcess/DataWriter/MySQLWriter.pyimport json import asyncio import random import logging import traceback from .BaseWriter import BaseWriter class MySQLWriter(BaseWriter): def __init__(self, config): super().__init__() self.config = config self.total_miss_count = 0 self.success_count = 0 self.table_checked = False self.key_fields = list() async def write(self, responses): await self.config.get_mysql_pool_cli() # init mysql pool miss_count = 0 original_length = len(responses) if self.config.filter: target_responses = list() for i in responses: i = self.config.filter(i) if i: target_responses.append(i) else: miss_count += 1 responses = target_responses if not responses: self.finish_once(miss_count, original_length) return # After filtered, still have responses to write if not self.table_checked: await self.table_check(responses) if await self.perform_write(responses): self.finish_once(miss_count, original_length) def __exit__(self, exc_type, exc_val, exc_tb): self.config.free_resource() logging.info("%s write done, total filtered %d item, total write %d item" % (self.config.name, self.total_miss_count, self.success_count)) def __enter__(self): return self def finish_once(self, miss_count, original_length): self.total_miss_count += miss_count self.success_count += original_length logging.info("%s write %d item, filtered %d item" % (self.config.name, original_length - miss_count, miss_count)) async def table_check(self, responses): await self.config.cursor.execute("SHOW TABLES LIKE '%s'" % (self.config.table, )) result = await self.config.cursor.fetchone() if result is None: await self.create_table(responses) # check field await self.config.cursor.execute("DESC %s" % (self.config.table, )) results = await self.config.cursor.fetchall() fields = set(i[0] for i in results) self.key_fields = list(i[0] for i in results) real_keys = set(responses[0].keys()) difference_set = real_keys.difference(fields) if difference_set: # real keys not subset of fields raise ValueError("Field %s not in MySQL Table: %s" % (str(difference_set), self.config.table)) self.table_checked = True async def create_table(self, responses): test_response = dict() for response in responses[:50]: for k, v in response.items(): if k not in test_response: test_response[k] = v elif test_response[k] is None: test_response[k] = v elif isinstance(v, dict) or isinstance(v, list): if len(json.dumps(test_response[k])) < len(json.dumps(v)): test_response[k] = v elif v is not None and test_response[k] < v: test_response[k] = v sql = """ CREATE TABLE `%s` ( """ % (self.config.table, ) first_field = True for key, value in responses[0].items(): if "Count" in key: field_type = "BIGINT" elif value is None: field_type = "TEXT" elif key in ("content", ) or isinstance(value, dict) or isinstance(value, list): field_type = "TEXT" elif isinstance(value, bool): field_type = "BOOLEAN" elif isinstance(value, int): field_type = "BIGINT" elif isinstance(value, float): field_type = "DOUBLE" # varchar can store at most 65536 bytes, utf8 occupy 1-8 bytes per character, # so length should be less than 65536 / 8 = 8192 # assume this field (the shortest length) * 4 <= the longest length(8192) elif len(value) > 2048: field_type = "TEXT" else: length = len(value) * 4 if length < 256: length = 256 field_type = "VARCHAR(%d)" % (length, ) sql += ("\t" if first_field else "\t\t") + "`%s` %s" % (key, field_type) if key == "id": sql += " NOT NULL,\n" else: sql += ",\n" if first_field: first_field = False tail_sql = """ \tPRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=%s """ % (self.config.encoding, ) sql += tail_sql logging.info("Creating table: %s\n%s", self.config.table, sql) await self.config.cursor.execute(sql) await self.config.connection.commit() logging.info("table created") async def perform_write(self, responses): sql = "REPLACE INTO %s VALUES " % (self.config.table, ) for each in responses: curr_sql = '(' for field in self.key_fields: val = each[field] if isinstance(val, dict) or isinstance(val, list): val = json.dumps(val) if val is None: curr_sql += 'NULL,' else: curr_sql += repr(val) + "," curr_sql = curr_sql[:-1] + '),\n' sql += curr_sql sql = sql[:-2] try_time = 0 while try_time < self.config.max_retry: try: await self.config.cursor.execute(sql.encode(self.config.encoding)) await self.config.cursor.connection.commit() return True except Exception as e: try_time += 1 if try_time < self.config.max_retry: logging.error("retry: %d, %s" % (try_time, str(e))) await asyncio.sleep(random.uniform(self.config.random_min_sleep, self.config.random_max_sleep)) else: logging.error("Give up MySQL writer: %s, After retry: %d times, still fail to write, " "total write %d items, total filtered: %d items, reason: %s" % (self.config.name, self.config.max_retry, self.success_count, self.total_miss_count, str(traceback.format_exc()))) return False PKPV}Mɂ 8idataapi_transform/DataProcess/DataWriter/RedisWriter.pyimport logging import asyncio import random import traceback import json import zlib from .BaseWriter import BaseWriter class RedisWriter(BaseWriter): def __init__(self, config): super().__init__() self.config = config self.total_miss_count = 0 self.success_count = 0 def encode(self, dict_object): string = json.dumps(dict_object) if self.config.compress: string = zlib.compress(string.encode(self.config.encoding)) return string async def write(self, responses): await self.config.get_redis_pool_cli() # init redis pool miss_count = 0 target_responses = list() for each_response in responses: if self.config.filter: each_response = self.config.filter(each_response) if not each_response: miss_count += 1 continue target_responses.append(each_response) self.success_count += 1 self.total_miss_count += miss_count if target_responses: try_time = 0 while try_time < self.config.max_retry: try: if self.config.is_range: await self.config.redis_write_method(self.config.key, *(self.encode(i) for i in target_responses)) else: pipe_line = self.config.redis_pool_cli.pipeline() for each in responses: pipe_line.hset(self.config.key, each["id"], self.encode(each)) await pipe_line.execute() logging.info("%s write %d item, filtered %d item" % (self.config.name, len(responses), miss_count)) break except Exception as e: try_time += 1 if try_time >= self.config.max_retry: logging.error("Fail to write after try: %d times, Write 0 items to redis, " "filtered %d item before write, error: %s" % (self.config.max_retry, miss_count, str(traceback.format_exc()))) else: await asyncio.sleep(random.uniform(self.config.random_min_sleep, self.config.random_max_sleep)) else: logging.info("Write 0 items to %s, filtered: %d, (all filtered, or pass empty result)" % (self.config.name, miss_count)) def __exit__(self, exc_type, exc_val, exc_tb): logging.info("%s write done, total filtered %d item, total write %d item" % (self.config.name, self.total_miss_count, self.success_count)) def __enter__(self): return self PKU+LA\UU6idataapi_transform/DataProcess/DataWriter/TXTWriter.pyimport logging from .BaseWriter import BaseWriter class TXTWriter(BaseWriter): def __init__(self, config): super().__init__() self.config = config self.f_out = open(config.filename, config.mode, encoding=config.encoding) self.total_miss_count = 0 self.success_count = 0 def write(self, responses): miss_count = 0 for each_response in responses: if self.config.expand: each_response = self.expand_dict(each_response, max_expand=self.config.expand) if self.config.filter: each_response = self.config.filter(each_response) if not each_response: miss_count += 1 continue self.f_out.write(self.config.join_val.join(str(value) for value in each_response.values()) + self.config.new_line) self.success_count += 1 self.total_miss_count += miss_count logging.info("%s write %d item, filtered %d item" % (self.config.filename, len(responses), miss_count)) def __exit__(self, exc_type, exc_val, exc_tb): self.f_out.close() logging.info("%s write done, total filtered %d item, total write %d item" % (self.config.filename, self.total_miss_count, self.success_count)) def __enter__(self): return self PK~MUjj7idataapi_transform/DataProcess/DataWriter/XLSXWriter.pyimport re import os import logging from openpyxl import Workbook, load_workbook from .BaseWriter import BaseWriter from openpyxl.cell.cell import ILLEGAL_CHARACTERS_RE from ..Config.DefaultValue import DefaultVal _warning = False class XLSXWriter(BaseWriter): def __init__(self, config): global _warning super().__init__() self.config = config self.col_dict = dict() self.row = 2 # headers self.header_generated = False self.file_already_exists = os.path.exists(self.config.filename) if "a" in self.config.mode and self.file_already_exists: self.wb = load_workbook(filename=self.config.filename, read_only=False) self.generate_header(from_file=True) else: self.wb = Workbook() self.ws1 = self.wb.active self.ws1.title = config.title self.total_miss_count = 0 self.success_count = 0 if not _warning: logging.warning("XLSXWriter will actually write to file when __exit__ of XLSXWriter called") _warning = True def write(self, responses): if not self.header_generated and self.config.headers: self.generate_header() miss_count = 0 for each_response in responses: if self.config.expand: each_response = self.expand_dict(each_response, max_expand=self.config.expand) if self.config.filter: each_response = self.config.filter(each_response) if not each_response: miss_count += 1 continue for key, value in each_response.items(): if key not in self.col_dict: self.col_dict[key] = len(self.col_dict) + 1 self.ws1.cell(row=1, column=self.col_dict[key], value=key) value = str(value) if value is not None else "" try: self.ws1.cell(row=self.row, column=self.col_dict[key], value=value) except Exception: new_value = re.sub(ILLEGAL_CHARACTERS_RE, "", value) logging.warning("row num: %d, key: %s, value: %s contains illegal characters, " "replaced illegal characters to: %s" % (self.row, key, value, new_value)) self.ws1.cell(row=self.row, column=self.col_dict[key], value=new_value) self.row += 1 self.success_count += 1 logging.info("%s write %d item, filtered %d item" % (self.config.filename, len(responses), miss_count)) def __exit__(self, exc_type, exc_val, exc_tb): self.wb.save(filename=self.config.filename) self.wb.close() logging.info("%s write done, total filtered %d item, total write %d item" % (self.config.filename, self.total_miss_count, self.success_count)) def __enter__(self): return self def generate_header(self, from_file=False): if from_file: if not self.wb.worksheets: # empty file return sheet = self.wb.worksheets[self.config.sheet_index] row_iter = sheet.rows try: row = next(row_iter) for each in row: self.col_dict[each.value] = len(self.col_dict) + 1 except StopIteration: # empty file return if len(self.col_dict) == 1 and list(self.col_dict.keys())[0] is None: # empty file self.col_dict.clear() return max_row = sheet.max_row self.row = max_row + 1 else: for key in self.config.headers: self.col_dict[key] = len(self.col_dict) + 1 self.ws1.cell(row=1, column=self.col_dict[key], value=key) self.header_generated = True PKK5idataapi_transform/DataProcess/DataWriter/__init__.pyPKKB6idataapi_transform/DataProcess/Meta/BaseDataProcess.py class BaseDataProcess(object): @staticmethod def expand_dict(origin_item, max_expand=0, current_expand=0, parent_key=None, parent_item=None): if max_expand == 0: return origin_item if max_expand != -1 and current_expand >= max_expand: return origin_item if parent_key: if isinstance(origin_item, dict): for sub_k, sub_v in origin_item.items(): parent_item[parent_key + "_" + sub_k] = sub_v if parent_key in parent_item: del parent_item[parent_key] elif isinstance(origin_item, list): for item in origin_item: BaseDataProcess.expand_dict(item, max_expand, current_expand + 1, parent_key, parent_item) return origin_item keys = [k for k in origin_item.keys()] has_sub_dict = False for k in keys: if isinstance(origin_item[k], dict): has_sub_dict = True sub_dict = origin_item[k] for sub_k, sub_v in sub_dict.items(): origin_item[k + "_" + sub_k] = sub_v del origin_item[k] elif isinstance(origin_item[k], list): for item in origin_item[k]: BaseDataProcess.expand_dict(item, max_expand, current_expand + 1, k, origin_item) if has_sub_dict: return BaseDataProcess.expand_dict(origin_item, max_expand, current_expand + 1) else: return origin_item PKgK/idataapi_transform/DataProcess/Meta/__init__.pyPKhcMgAidataapi_transform/DataProcess/PersistentUtil/PersistentWriter.pyimport os import json import time import hashlib import logging class PersistentWriter(object): def __init__(self, persistent_key): self.f_name = persistent_key + ".json" self.latest_record = set() self.load_last_record() self.f_out = open(self.f_name, "a+", encoding="utf8") self.prev_latest_record_num = len(self.latest_record) def load_last_record(self): if os.path.exists(self.f_name): try: with open(self.f_name, "r", encoding="utf8") as f: self.latest_record = set(json.loads(f.read())["record"]) except Exception: logging.error("Broken record file: %s, recreating file" % (self.f_name, )) self.remove_file() def write(self): if len(self.latest_record) == self.prev_latest_record_num: return else: self.prev_latest_record_num = len(self.latest_record) self.truncate() self.f_out.seek(0) ts = int(time.time()) struct_time = time.localtime(ts) dt = time.strftime('%Y-%m-%d %H:%M:%S', struct_time) record = { "record": list(self.latest_record), "record_length": len(self.latest_record), "timestamp": ts, "date": dt, "filename": self.f_name } self.f_out.write(json.dumps(record)) logging.info("persistent to disk, f_name: %s, total_task_num: %d" % (self.f_name, len(self.latest_record))) def add(self, key): key = hashlib.md5(key.encode("utf8")).hexdigest() self.latest_record.add(key) def __contains__(self, item): key = hashlib.md5(item.encode("utf8")).hexdigest() return key in self.latest_record def sync(self): self.f_out.flush() def remove_file(self): os.unlink(self.f_name) def truncate(self): self.f_out.truncate(0) def clear(self, start_fresh_if_done): self.latest_record = None if start_fresh_if_done: self.remove_file() PKPSM9idataapi_transform/DataProcess/PersistentUtil/__init__.pyPK!H8053idataapi_transform-1.6.3.dist-info/entry_points.txtN+I/N.,()*)J+N/ʵLI,IL,Ȍ Y&fqqPKK11*idataapi_transform-1.6.3.dist-info/LICENSEThe MIT License (MIT) Copyright (c) 2017 zpoint Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. PK!H>*RQ(idataapi_transform-1.6.3.dist-info/WHEEL HM K-*ϳR03rOK-J,/RH,rzd&Y)r$[)T&UrPK!Hٓ: +idataapi_transform-1.6.3.dist-info/METADATAmn0 ~"Ҧ1iQϵi]jM$/lH$ϱ3'%zL:J0[oQJV}:m d=n!=x ɞ XeOGy E \h>^Nūn)1 'lþi`4+] R.t}|;mKw^qkm%NτZ&§ l-!UZPK!HR'x)idataapi_transform-1.6.3.dist-info/RECORDI}b`D y_̪~GzE">ƽ7MQ4DvC3 Gc!86+Whj$$.!0AixmL85Rl%qk>v~GI2#X( lFE8ǟ(ay'(g{Wqj1`t^sY PP-WE_AD#yKjZRへ%#1vD4.44s]ۣMmY<ǐ{T'HH /{+77 |4ԓ՗I;x5mEpV! qk."P_kQU ܹ87K|l  Y'ؾY<sM(N.[YIw4jv31gmx(&]T+ .Eeg-NcBfQ Xd[HDQ|f^࿗3Of\x}2<2 M.pI"*tcokN&qYrd#@RfFc *~ 2tbbgESF߸FnVT^0GUOLS.L &}u S(bڝvJWnF I3O\NN .QEĕ`6ܦ;Smן.}? MhC _6`e$h8u~ޒh:4|-No>ffpcV/\FG88e>q宴@eZ}bh:y;W.W*P\"`N+KcWRW~7Y _v'Mv`(.ז8AyfzA_ ` WIP6V dq*.e 7ӖQ8Sh1!ķ2($SvٰkQ>%~+ib8q;i&b\X;ӑidPi#J:GxwW-婆T܃2۟ک[ 5VJ~?޿Y>{ 0r ` [?\ hY9 6[v62ix#0*}J:`f_hI;%/O'NV) sx+1m'Ik2, <.5//W޸eϦyFiiČFBx}W3\8U~}s+}j *d=ID$VSz6*{R*}7"> ޸|;fP|am8W ̺Wxq<ţbLRp/NQ $}{űxVJNA |'p L-I.8J(汃ƲQ#jbKS- a\J0NAO+Ƃ}')zo)OXzΤ7GQn7(-;2yu$߹?&=bvzuqi3۽v!JoH Ô;'D~BA( ,ڴ%컢m3r7{B7C)Y Av=u1Y[I %S秓kĨK8!bAP /VN#fSlT<Ϥ"@Llgl;4Y٩&AYNP̈j|u^*|ܶ;>S&PKtM1Oidataapi_transform/__init__.pyPK}Lfe""idataapi_transform/cli.pyPK~Lɭ01 1 0'idataapi_transform/DataProcess/ProcessFactory.pyPK狗K*-3idataapi_transform/DataProcess/__init__.pyPK<9MCG8u3idataapi_transform/DataProcess/Config/ConnectorConfig.pyPKQkM̷% % 5N:idataapi_transform/DataProcess/Config/DefaultValue.pyPK[}Mi[,[,1Fidataapi_transform/DataProcess/Config/ESConfig.pyPK̈}Mio2psidataapi_transform/DataProcess/Config/LogConfig.pyPKC}M}}3|idataapi_transform/DataProcess/Config/MainConfig.pyPK VK1Pidataapi_transform/DataProcess/Config/__init__.pyPK-Mkt?idataapi_transform/DataProcess/Config/ConfigUtil/AsyncHelper.pyPKZK@>idataapi_transform/DataProcess/Config/ConfigUtil/BaseConfig.pyPKtMNE[w[w@idataapi_transform/DataProcess/Config/ConfigUtil/GetterConfig.pyPKUMԯSS@> idataapi_transform/DataProcess/Config/ConfigUtil/WriterConfig.pyPK`K<`idataapi_transform/DataProcess/Config/ConfigUtil/__init__.pyPK pMDEnY<<6aidataapi_transform/DataProcess/DataGetter/APIGetter.pyPKYKUYUU7iidataapi_transform/DataProcess/DataGetter/BaseGetter.pyPKCM2 2 6idataapi_transform/DataProcess/DataGetter/CSVGetter.pyPKEV}M^mm5idataapi_transform/DataProcess/DataGetter/ESGetter.pyPKCM?S4 7Yidataapi_transform/DataProcess/DataGetter/JsonGetter.pyPKEV}MCoL8idataapi_transform/DataProcess/DataGetter/MongoGetter.pyPKEV}M-8idataapi_transform/DataProcess/DataGetter/MySQLGetter.pyPKEV}MA נ8idataapi_transform/DataProcess/DataGetter/RedisGetter.pyPK)cLl<<7 idataapi_transform/DataProcess/DataGetter/XLSXGetter.pyPK2K5idataapi_transform/DataProcess/DataGetter/__init__.pyPK{KV7kidataapi_transform/DataProcess/DataWriter/BaseWriter.pyPKVM'6idataapi_transform/DataProcess/DataWriter/CSVWriter.pyPKEV}M/x 5+idataapi_transform/DataProcess/DataWriter/ESWriter.pyPKU+L ??7s9idataapi_transform/DataProcess/DataWriter/JsonWriter.pyPKEV}MH %k k 8?idataapi_transform/DataProcess/DataWriter/MongoWriter.pyPKEV}M5Z,O8Lidataapi_transform/DataProcess/DataWriter/MySQLWriter.pyPKPV}Mɂ 8fidataapi_transform/DataProcess/DataWriter/RedisWriter.pyPKU+LA\UU6qidataapi_transform/DataProcess/DataWriter/TXTWriter.pyPK~MUjj7widataapi_transform/DataProcess/DataWriter/XLSXWriter.pyPKK5Nidataapi_transform/DataProcess/DataWriter/__init__.pyPKKB6idataapi_transform/DataProcess/Meta/BaseDataProcess.pyPKgK/idataapi_transform/DataProcess/Meta/__init__.pyPKhcMgA_idataapi_transform/DataProcess/PersistentUtil/PersistentWriter.pyPKPSM9ٖidataapi_transform/DataProcess/PersistentUtil/__init__.pyPK!H80530idataapi_transform-1.6.3.dist-info/entry_points.txtPKK11*idataapi_transform-1.6.3.dist-info/LICENSEPK!H>*RQ(*idataapi_transform-1.6.3.dist-info/WHEELPK!Hٓ: +œidataapi_transform-1.6.3.dist-info/METADATAPK!HR'x)Eidataapi_transform-1.6.3.dist-info/RECORDPK,,