PK]L8CCidataapi_transform/__init__.py"""convert data from a format to another format, read or write from file or database, suitable for iDataAPI""" from .cli import main from .DataProcess.Config.ConfigUtil import WriterConfig from .DataProcess.Config.ConfigUtil import GetterConfig from .DataProcess.ProcessFactory import ProcessFactory __version__ = '1.3.9' PK}Lfe""idataapi_transform/cli.pyimport json import asyncio import argparse from .DataProcess.Config.DefaultValue import DefaultVal from .DataProcess.Config.ConfigUtil import GetterConfig from .DataProcess.Config.ConfigUtil import WriterConfig from .DataProcess.ProcessFactory import ProcessFactory class Args(object): from_choices = ["API", "ES", "CSV", "XLSX", "JSON", "REDIS", "MYSQL", "MONGO"] from_desc = "argument 'from' can only set to one of 'API', 'ES', 'CSV', 'XLSX', " \ "'JSON'(means json line by line file), 'REDIS', 'MYSQL' or 'MONGO'" to_choices = ["csv", "xlsx", "json", "txt", "es", "redis", 'mysql', 'mongo'] to_desc = "argument 'to' can only set to one of \"csv\", \"xlsx\", \"json\", \"txt\" \"es\", \"json\", \"redis\", " \ "\"mysql\", \"mongo\", \"json\" will write 'json.dumps(item)' line by line. " \ "\"txt\" will write each item line by line, each element in each line is separated by 'space' bu default" source_desc = """ argument 'source', When argument '-from' set to 'ES', source should be 'index:doc_type' When argument 'from' set tp 'API', source should be 'http://... argument 'from' set tp 'REDIS', source should be key name argument 'from' set tp 'MYSQL', source should be table name argument 'from' set to others, source should be file path """ dest_desc = "argument 'dest', filename to save result, no need for suffix, " \ "ie '/Desktop/result', default: './result'\n" \ "When argument '-to' set to 'ES', dest should be 'index:doc_type'" per_limit_desc = "amount of data buffered, when buffer filled, Program will write buffered data to 'dest', default 100" max_limit_desc = "write at most 'max_limit' data to 'dest', if 'max_limit' set to 0, means no limit, default to None" retry_desc = "when fetch data failed, retry at most 'retry' time, default 3" r_encoding_desc = "encoding of input file, ignore for xlsx format, default 'utf8'" w_encoding_desc = "encoding of output file, ignore for xlsx format, default 'utf8'" filter_desc = "file contains a 'my_filter(item)' function for filter" param_file_desc = """When you have many item save in id.json, --param_file './id.json::id::pid' means open './id.json ', read each json object line by line, use each_json['id'] as the parameter 'pid' and add it to the tail part of 'source'. --param_file can be either "filename.json::json_param::request_param" or "filename.txt::request_param" """ expand_desc = """If your item is {"a": {"b": "c"}, "b": "d"}, --expand 1 will make your item become {"a_b": "c", "b": "d"}, --expand N means expand at most N level deep of your object, --expand -1 means expand all level -- expand 0 means no expand of your item. Default 0. """ qsn_desc = """quote scientific notation, ie: 4324234234234234123123 will become 4.32423423423423E+021 in normal csv, If quote like '4324234234234234123123', it won't become scientific notation, Only work for output format 'csv' --qsn True means quote scientific notation, --qsn False means not quote scientific notation""" query_body_desc = """ElasticSearch query body, size has same function as "--limit", i.e: body = { "size": 100, "_source": { "includes": ["location", "title", "city", "id"] }, "query": { "bool": { "must": [ { "term": {"appCode": {"value": "ctrip"}} } ] } } } """ write_mode_desc = """'w' or 'a+'""" key_type_desc = """redis data type to operate, options: [LIST] or [HASH], default: [LIST]""" getter_config_map = { Args.from_choices[0]: GetterConfig.RAPIConfig, Args.from_choices[1]: GetterConfig.RESConfig, Args.from_choices[2]: GetterConfig.RCSVConfig, Args.from_choices[3]: GetterConfig.RXLSXConfig, Args.from_choices[4]: GetterConfig.RJsonConfig, Args.from_choices[5]: GetterConfig.RRedisConfig, Args.from_choices[6]: GetterConfig.RMySQLConfig, Args.from_choices[7]: GetterConfig.RMongoConfig } writer_config_map = { Args.to_choices[0]: WriterConfig.WCSVConfig, Args.to_choices[1]: WriterConfig.WXLSXConfig, Args.to_choices[2]: WriterConfig.WJsonConfig, Args.to_choices[3]: WriterConfig.WJsonConfig, Args.to_choices[4]: WriterConfig.WESConfig, Args.to_choices[5]: WriterConfig.WRedisConfig, Args.to_choices[6]: WriterConfig.WMySQLConfig, Args.to_choices[7]: WriterConfig.WMongoConfig } def get_arg(): parser = argparse.ArgumentParser(prog="idataapi_transform", description='convert data from a format to another format, ' 'read/write from file or database, suitable for iDataAPI') parser.add_argument("from", choices=Args.from_choices, help=Args.from_desc, type=str.upper) parser.add_argument("to", choices=Args.to_choices, help=Args.to_desc, type=str.lower) parser.add_argument("source", help=Args.source_desc) parser.add_argument("dest", help=Args.dest_desc, default=DefaultVal.dest, nargs="?") parser.add_argument("--per_limit", default=DefaultVal.per_limit, type=int, help=Args.per_limit_desc) parser.add_argument("--max_limit", default=DefaultVal.max_limit, type=int, help=Args.max_limit_desc) parser.add_argument("--max_retry", default=DefaultVal.max_retry, type=int, help=Args.retry_desc) parser.add_argument("--r_encoding", default=DefaultVal.default_encoding, help=Args.r_encoding_desc) parser.add_argument("--w_encoding", default=DefaultVal.default_encoding, help=Args.w_encoding_desc) parser.add_argument("--filter", default=None, help=Args.filter_desc) parser.add_argument("--expand", default=None, type=int, help=Args.expand_desc) parser.add_argument("--qsn", default=None, type=bool, help=Args.qsn_desc) parser.add_argument("--query_body", default=DefaultVal.query_body, type=str, help=Args.query_body_desc) parser.add_argument("--write_mode", default=DefaultVal.default_file_mode_w, type=str, help=Args.write_mode_desc) parser.add_argument("--key_type", default=DefaultVal.default_key_type, type=str.upper, help=Args.key_type_desc) return parser.parse_args() def get_filter(filter_file): if not filter_file: return None with open(filter_file, "r") as f: exec(f.read()) func = locals()["my_filter"] return func async def getter_to_writer(getter, writer): with writer as safe_writer: async for items in getter: if asyncio.iscoroutinefunction(safe_writer.write): await safe_writer.write(items) else: safe_writer.write(items) def main(): args = get_arg() from_ = getattr(args, "from") from_args = list() from_kwargs = dict() to_args = list() to_kwargs = dict() if from_ != Args.from_choices[0]: # not api from_args.extend(args.source.split(":")) else: from_args.extend([args.source]) from_kwargs["encoding"] = args.r_encoding from_kwargs["key_type"] = args.key_type if args.query_body: try: from_kwargs["query_body"] = json.loads(args.query_body) except Exception as e: raise SyntaxError("--query_body must be json serialized") for key in ("per_limit", "max_limit", "max_retry"): from_kwargs[key] = getattr(args, key) to_kwargs["filter_"] = get_filter(args.filter) to_kwargs["encoding"] = args.w_encoding to_kwargs["mode"] = args.write_mode to_kwargs["key_type"] = args.key_type for key in ("max_retry", "expand", "qsn"): to_kwargs[key] = getattr(args, key) if from_ not in getter_config_map: raise ValueError("argument from must be in %s" % (str(Args.from_choices), )) getter_config = getter_config_map[from_](*from_args, **from_kwargs) getter = ProcessFactory.create_getter(getter_config) if args.to == Args.to_choices[4]: # es indices, doc_type = args.dest.split(":") to_args.append(indices) to_args.append(doc_type) elif args.to in Args.to_choices[5:]: # redis, mysql, mongo if args.dest == DefaultVal.dest: to_args.append(DefaultVal.dest_without_path) else: to_args.append(args.dest) else: dest = args.dest + "." + args.to to_args.append(dest) writer_config = writer_config_map[args.to](*to_args, **to_kwargs) writer = ProcessFactory.create_writer(writer_config) loop = asyncio.get_event_loop() loop.run_until_complete(getter_to_writer(getter, writer)) if __name__ == "__main__": main() PK~Lɭ01 1 0idataapi_transform/DataProcess/ProcessFactory.py# config from .Config.MainConfig import main_config from .Config.ConfigUtil import GetterConfig from .Config.ConfigUtil import WriterConfig from .DataGetter.ESGetter import ESScrollGetter from .DataGetter.CSVGetter import CSVGetter from .DataGetter.APIGetter import APIGetter, APIBulkGetter from .DataGetter.JsonGetter import JsonGetter from .DataGetter.XLSXGetter import XLSXGetter from .DataGetter.RedisGetter import RedisGetter from .DataGetter.MySQLGetter import MySQLGetter from .DataGetter.MongoGetter import MongoGetter from .DataWriter.CSVWriter import CSVWriter from .DataWriter.ESWriter import ESWriter from .DataWriter.JsonWriter import JsonWriter from .DataWriter.TXTWriter import TXTWriter from .DataWriter.XLSXWriter import XLSXWriter from .DataWriter.RedisWriter import RedisWriter from .DataWriter.MySQLWriter import MySQLWriter from .DataWriter.MongoWriter import MongoWriter class ProcessFactory(object): config_getter_map = { GetterConfig.RAPIConfig: APIGetter, GetterConfig.RCSVConfig: CSVGetter, GetterConfig.RESConfig: ESScrollGetter, GetterConfig.RJsonConfig: JsonGetter, GetterConfig.RXLSXConfig: XLSXGetter, GetterConfig.RAPIBulkConfig: APIBulkGetter, GetterConfig.RRedisConfig: RedisGetter, GetterConfig.RMySQLConfig: MySQLGetter, GetterConfig.RMongoConfig: MongoGetter } config_writer_map = { WriterConfig.WCSVConfig: CSVWriter, WriterConfig.WESConfig: ESWriter, WriterConfig.WJsonConfig: JsonWriter, WriterConfig.WTXTConfig: TXTWriter, WriterConfig.WXLSXConfig: XLSXWriter, WriterConfig.WRedisConfig: RedisWriter, WriterConfig.WMySQLConfig: MySQLWriter, WriterConfig.WMongoConfig: MongoWriter } @staticmethod def create_getter(config): """ create a getter based on config :return: getter """ for config_class, getter_class in ProcessFactory.config_getter_map.items(): if isinstance(config, config_class): return getter_class(config) raise ValueError("create_getter must pass one of the instance of [RAPIConfig, RCSVConfig, RESConfig, " "RJsonConfig, RXLSXConfig, RAPIBulkConfig, RRedisConfig, RMySQLConfig, RMongoConfig]") @staticmethod def create_writer(config): """ create a writer based on config :return: a writer """ for config_class, writer_class in ProcessFactory.config_writer_map.items(): if isinstance(config, config_class): return writer_class(config) else: raise ValueError("create_writer must pass one of the instance of [WCSVConfig, WESConfig, WJsonConfig, " "WTXTConfig, WXLSXConfig, WRedisConfig, WMySQLConfig, WMongoConfig]") PK狗K*idataapi_transform/DataProcess/__init__.pyPK LlWw8idataapi_transform/DataProcess/Config/ConnectorConfig.pyimport aiohttp import asyncio import inspect from .MainConfig import main_config main_config = main_config() default_concurrency_limit = main_config["main"].getint("concurrency") class _SessionManger(object): def __init__(self, concurrency_limit=default_concurrency_limit, loop=None): self.session = self._generate_session(concurrency_limit=concurrency_limit, loop=loop) @staticmethod def _generate_connector(limit=default_concurrency_limit, loop=None): """ https://github.com/KeepSafe/aiohttp/issues/883 if connector is passed to session, it is not available anymore """ if not loop: loop = asyncio.get_event_loop() return aiohttp.TCPConnector(limit=limit, loop=loop) @staticmethod def _generate_session(concurrency_limit=default_concurrency_limit, loop=None): if not loop: loop = asyncio.get_event_loop() return aiohttp.ClientSession(connector=_SessionManger._generate_connector(limit=concurrency_limit, loop=loop), loop=loop) def get_session(self): return self.session def __del__(self): try: if inspect.iscoroutinefunction(self.session.close): loop = asyncio.get_event_loop() loop.run_until_complete(self.session.close()) else: self.session.close() except Exception as e: pass session_manger = _SessionManger() PKxL <5idataapi_transform/DataProcess/Config/DefaultValue.pyimport os import hashlib from .MainConfig import main_config base_config = main_config() class DefaultVal(object): per_limit = base_config["main"].getint("per_limit") max_limit = base_config["main"].get("max_limit") if max_limit != "None": max_limit = int(max_limit) else: max_limit = None max_retry = base_config["main"].getint("max_retry") random_min_sleep = base_config["main"].getint("random_min_sleep") random_max_sleep = base_config["main"].getint("random_max_sleep") default_file_mode_r = "r" default_file_mode_w = "w" default_encoding = "utf8" new_line = "\n" join_val = " " title = "example" qsn = None query_body = None dest_without_path = "result" dest = os.getcwd() + "/" + dest_without_path interval = 5 concurrency = 50 default_key_type = "LIST" @staticmethod def default_id_hash_func(item): if "appCode" in item and item["appCode"] and "id" in item and item["id"]: value = (item["appCode"] + "_" + item["id"]).encode("utf8") else: value = str(item).encode("utf8") return hashlib.md5(value).hexdigest() PKLL%&%&1idataapi_transform/DataProcess/Config/ESConfig.pyimport asyncio import aiohttp import json import time import logging import copy from elasticsearch_async.connection import AIOHttpConnection as OriginAIOHttpConnection from elasticsearch_async.transport import AsyncTransport as OriginAsyncTransport from elasticsearch_async.transport import ensure_future from elasticsearch import TransportError from aiohttp.client_exceptions import ServerFingerprintMismatch from elasticsearch.exceptions import ConnectionError, ConnectionTimeout, SSLError from elasticsearch.compat import urlencode from elasticsearch.connection_pool import ConnectionPool, DummyConnectionPool from elasticsearch_async import AsyncTransport from elasticsearch_async import AsyncElasticsearch es_hosts = None if hasattr(aiohttp, "Timeout"): async_timeout_func = aiohttp.Timeout else: import async_timeout async_timeout_func = async_timeout.timeout def init_es(hosts, es_headers, timeout_): global es_hosts, AsyncElasticsearch, AsyncTransport es_hosts = hosts if not es_hosts: return False class MyAsyncTransport(OriginAsyncTransport): """ Override default AsyncTransport to add timeout """ def perform_request(self, method, url, params=None, body=None, timeout=None, headers=None): if body is not None: body = self.serializer.dumps(body) # some clients or environments don't support sending GET with body if method in ('HEAD', 'GET') and self.send_get_body_as != 'GET': # send it as post instead if self.send_get_body_as == 'POST': method = 'POST' # or as source parameter elif self.send_get_body_as == 'source': if params is None: params = {} params['source'] = body body = None if body is not None: try: body = body.encode('utf-8') except (UnicodeDecodeError, AttributeError): # bytes/str - no need to re-encode pass ignore = () if params: ignore = params.pop('ignore', ()) if isinstance(ignore, int): ignore = (ignore,) return ensure_future(self.main_loop(method, url, params, body, ignore=ignore, timeout=timeout, headers=headers), loop=self.loop) @asyncio.coroutine def main_loop(self, method, url, params, body, ignore=(), timeout=None, headers=None): for attempt in range(self.max_retries + 1): connection = self.get_connection() try: status, headers, data = yield from connection.perform_request( method, url, params, body, ignore=ignore, timeout=timeout, headers=headers) except TransportError as e: if method == 'HEAD' and e.status_code == 404: return False retry = False if isinstance(e, ConnectionTimeout): retry = self.retry_on_timeout elif isinstance(e, ConnectionError): retry = True elif e.status_code in self.retry_on_status: retry = True if retry: # only mark as dead if we are retrying self.mark_dead(connection) # raise exception on last retry if attempt == self.max_retries: raise else: raise else: if method == 'HEAD': return 200 <= status < 300 # connection didn't fail, confirm it's live status self.connection_pool.mark_live(connection) if data: data = self.deserializer.loads(data, headers.get('content-type')) return data class AIOHttpConnection(OriginAIOHttpConnection): """ Override default AIOHttpConnection.perform_request to add headers """ @asyncio.coroutine def perform_request(self, method, url, params=None, body=None, timeout=None, ignore=(), headers=None): url_path = url if params: url_path = '%s?%s' % (url, urlencode(params or {})) url = self.base_url + url_path start = self.loop.time() response = None local_headers = es_headers if headers: local_headers = copy.deepcopy(es_headers) if es_headers else dict() local_headers.update(headers) try: with async_timeout_func(timeout or timeout_ or self.timeout): response = yield from self.session.request(method, url, data=body, headers=local_headers) raw_data = yield from response.text() duration = self.loop.time() - start except Exception as e: self.log_request_fail(method, url, url_path, body, self.loop.time() - start, exception=e) if isinstance(e, ServerFingerprintMismatch): raise SSLError('N/A', str(e), e) if isinstance(e, asyncio.TimeoutError): raise ConnectionTimeout('TIMEOUT', str(e), e) raise ConnectionError('N/A', str(e), e) finally: if response is not None: yield from response.release() # raise errors based on http status codes, let the client handle those if needed if not (200 <= response.status < 300) and response.status not in ignore: self.log_request_fail(method, url, url_path, body, duration, status_code=response.status, response=raw_data) self._raise_error(response.status, raw_data) self.log_request_success(method, url, url_path, body, response.status, raw_data, duration) return response.status, response.headers, raw_data class MyAsyncElasticsearch(AsyncElasticsearch): async def add_dict_to_es(self, indices, doc_type, items, id_hash_func, app_code=None, actions=None, create_date=None, error_if_fail=True, timeout=None, auto_insert_createDate=True): if not actions: actions = "index" body = "" for item in items: if app_code: item["appCode"] = app_code if auto_insert_createDate and "createDate" not in item: if create_date: item["createDate"] = create_date else: item["createDate"] = int(time.time()) action = { actions: { "_index": indices, "_type": doc_type, "_id": id_hash_func(item) } } if actions == "update": item = {"doc": item} body += json.dumps(action) + "\n" + json.dumps(item) + "\n" try: success = fail = 0 r = await self.transport.perform_request("POST", "/_bulk?pretty", body=body, timeout=timeout) if r["errors"]: for item in r["items"]: for k, v in item.items(): if "error" in v: if error_if_fail: # log error logging.error(json.dumps(v["error"])) fail += 1 else: success += 1 else: success = len(r["items"]) return success, fail, r except Exception as e: import traceback logging.error(traceback.format_exc()) logging.error("elasticsearch Exception, give up: %s" % (str(e), )) return None, None, None def __del__(self): """ compatible with elasticsearch-async-6.1.0 """ try: loop = asyncio.get_event_loop() loop.run_until_complete(self.transport.close()) except Exception: pass OriginAIOHttpConnection.perform_request = AIOHttpConnection.perform_request OriginAsyncTransport.perform_request = MyAsyncTransport.perform_request OriginAsyncTransport.main_loop = MyAsyncTransport.main_loop AsyncElasticsearch = MyAsyncElasticsearch return True global_client = None def get_es_client(): global global_client if global_client is None: global_client = AsyncElasticsearch(hosts=es_hosts) return global_client """ below add in version 1.0.9, compatible for "close()" ===> future object """ def close(self): try: asyncio.ensure_future(self.connection.close()) except TypeError: pass def connection_pool_close(self): for conn in self.orig_connections: try: asyncio.ensure_future(conn.close()) except TypeError: pass ConnectionPool.close = connection_pool_close DummyConnectionPool.close = close PK⌞K S2idataapi_transform/DataProcess/Config/LogConfig.pyimport os import logging from logging.handlers import RotatingFileHandler format_str = "%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s" date_formatter_str = '[%Y-%m-%d %H:%M:%S]' formatter = logging.Formatter(format_str, datefmt=date_formatter_str) class SingleLevelFilter(logging.Filter): def __init__(self, passlevel, reject): super(SingleLevelFilter, self).__init__() self.passlevel = passlevel self.reject = reject def filter(self, record): if self.reject: return record.levelno != self.passlevel else: return record.levelno == self.passlevel def init_log(log_dir, max_log_file_bytes, ini_path): root_logger = logging.getLogger() root_logger.setLevel(logging.INFO) # console console = logging.StreamHandler() console.setFormatter(formatter) root_logger.addHandler(console) if log_dir: if not os.path.exists(log_dir): logging.error("log_dir(%s) in configure file(%s) not exists, I will not log to file" % (log_dir, ini_path)) return False if not max_log_file_bytes: logging.error("log_byte not set, please configure log_byte in configure file(%s), " "or I will not log to file" % (ini_path, )) return False # info h1 = RotatingFileHandler("%s/info.log" % (log_dir, ), mode="a", maxBytes=max_log_file_bytes, encoding="utf8", backupCount=1) h1.setFormatter(formatter) f1 = SingleLevelFilter(logging.INFO, False) h1.addFilter(f1) root_logger.addHandler(h1) # error h1 = RotatingFileHandler("%s/error.log" % (log_dir, ), mode="a", maxBytes=max_log_file_bytes, encoding="utf8", backupCount=1) h1.setFormatter(formatter) f1 = SingleLevelFilter(logging.ERROR, False) h1.addFilter(f1) root_logger.addHandler(h1) return True return False PK5Ld3idataapi_transform/DataProcess/Config/MainConfig.pyimport os import json import configparser from os.path import expanduser from .LogConfig import init_log from .ESConfig import init_es default_configure_content = """ [main] # default max concurrency value for APIGetter concurrency = 50 # buffer size per_limit = 100 # fetch at most max_limit items max_limit = None # max retry for getter before give up if fail to get data max_retry = 3 # sleep interval if fail random_min_sleep = 1 random_max_sleep = 3 [es] # elasticsearch host # hosts = ["localhost:9393"] # elasticsearch headers when perform http request # headers = {"Host": "localhost", "value": "value"} # request timeout, seconds # timeout = 10 [log] # a directory to save log file # path = /Users/zpoint/Desktop/idataapi-transform/logs/ # max byte per log file # log_byte = 5242880 """ redis_config_content = """ [redis] host = localhost port = 0 db = 0 password = timeout = 3 encoding = utf8 # whether need to del the key after get object from redis, 0 means false, 1 means true need_del = 0 # default direction when read/write , "L" means lpop/lpush, "R" means rpop/rpush direction = L """ mysql_config_content = """ [mysql] host = localhost port = 0 user = root password = database = # default charset encoding = utf8 """ mongo_config_content = """ [mongo] host = localhost port = 0 username = password = database = test_database """ class MainConfig(object): def __init__(self, ini_path=None): # singleton if not hasattr(self, "__instance"): self.ini_path = ini_path if not self.ini_path: home = expanduser("~") self.ini_path = home + "/idataapi-transform.ini" if not os.path.exists(self.ini_path): with open(self.ini_path, "w") as f: f.write(default_configure_content + redis_config_content + mysql_config_content + mongo_config_content) self.__instance = configparser.ConfigParser() self.__instance.read(self.ini_path) MainConfig.__instance = self.__instance self.has_log_file = self.__instance.has_log_file = self.config_log() self.has_es_configured = self.__instance.has_es_configured = self.config_es() self.has_redis_configured = self.__instance.has_redis_configured = self.config_redis() self.has_mysql_configured = self.__instance.has_mysql_configured = self.config_mysql() self.has_mongo_configured = self.__instance.has_mongo_configured = self.config_mongo() self.__instance.ini_path = self.ini_path def __call__(self): return self.__instance def config_log(self): max_log_file_bytes = self.__instance["log"].getint("log_byte") log_path = self.__instance["log"].get("path") return init_log(log_path, max_log_file_bytes, self.ini_path) def config_es(self): hosts = self.__instance["es"].get("hosts") timeout = self.__instance["es"].getint("timeout") if hosts: try: hosts = json.loads(hosts) except Exception as e: raise ValueError("es host must be json serialized") headers = self.__instance["es"].get("headers") if headers and headers != "None": try: headers = json.loads(headers) except Exception as e: raise ValueError("es headers must be json serialized") else: headers = None return init_es(hosts, headers, timeout) def config_redis(self): try: self.__instance["redis"].get("port") except KeyError as e: with open(self.ini_path, "a+") as f: f.write(redis_config_content) self.__instance.read(self.ini_path) port = self.__instance["redis"].getint("port") return port > 0 def config_mysql(self): try: self.__instance["mysql"].get("port") except KeyError as e: with open(self.ini_path, "a+") as f: f.write(mysql_config_content) self.__instance.read(self.ini_path) port = self.__instance["mysql"].getint("port") return port > 0 def config_mongo(self): try: self.__instance["mongo"].get("port") except KeyError as e: with open(self.ini_path, "a+") as f: f.write(mongo_config_content) self.__instance.read(self.ini_path) port = self.__instance["mongo"].getint("port") return port > 0 main_config = MainConfig() PK VK1idataapi_transform/DataProcess/Config/__init__.pyPKZK@>idataapi_transform/DataProcess/Config/ConfigUtil/BaseConfig.pyimport abc class BaseGetterConfig(object, metaclass=abc.ABCMeta): @abc.abstractmethod def __init__(self, *args, **kwargs): pass class BaseWriterConfig(object, metaclass=abc.ABCMeta): @abc.abstractmethod def __init__(self, *args, **kwargs): pass PK,^L{v\v\@idataapi_transform/DataProcess/Config/ConfigUtil/GetterConfig.pyimport asyncio import inspect import aioredis try: import aiomysql except Exception as e: pass try: import motor.motor_asyncio except Exception as e: pass from .BaseConfig import BaseGetterConfig from ..ESConfig import get_es_client from ..DefaultValue import DefaultVal from ..ConnectorConfig import session_manger, main_config class RAPIConfig(BaseGetterConfig): def __init__(self, source, per_limit=DefaultVal.per_limit, max_limit=DefaultVal.max_limit, max_retry=DefaultVal.max_retry, random_min_sleep=DefaultVal.random_min_sleep, random_max_sleep=DefaultVal.random_max_sleep, session=None, filter_=None, return_fail=False, tag=None, **kwargs): """ will request until no more next_page to get, or get "max_limit" items :param source: API to get, i.e. "http://..." :param per_limit: how many items to get per time :param max_limit: get at most max_limit items, if not set, get all :param max_retry: if request fail, retry max_retry times :param random_min_sleep: if request fail, random sleep at least random_min_sleep seconds before request again :param random_max_sleep: if request fail, random sleep at most random_min_sleep seconds before request again :param session: aiohttp session to perform request :param filter_: run "transform --help" to see command line interface explanation for detail :param return_fail: if set to True, for each iteration, will return a tuple, api_getter = ProcessFactory.create_getter(RAPIConfig("http://...")) async for items, bad_objects in getter: A = bad_objects[0] A.response: -> json object: '{"appCode": "weixinpro", "dataType": "post", "message": "param error", "retcode": "100005"}', if fail in request, response will be None A.tag: -> tag you pass to RAPIConfig A.source: -> source you pass to RAPIConfig :param args: :param kwargs: Example: api_config = RAPIConfig("http://...") api_getter = ProcessFactory.create_getter(api_config) async for items in api_getter: print(items) """ super().__init__() self.source = source self.per_limit = per_limit self.max_limit = max_limit self.max_retry = max_retry self.random_min_sleep = random_min_sleep self.random_max_sleep = random_max_sleep self.session = session_manger.get_session() if not session else session self.filter = filter_ self.return_fail = return_fail self.tag = tag class RCSVConfig(BaseGetterConfig): def __init__(self, filename, mode=DefaultVal.default_file_mode_r, encoding=DefaultVal.default_encoding, per_limit=DefaultVal.per_limit, max_limit=DefaultVal.max_limit, filter_=None, **kwargs): """ :param filename: filename to read :param mode: file open mode, i.e "r" :param encoding: file encoding i.e "utf8" :param per_limit: how many items to get per time :param max_limit: get at most max_limit items, if not set, get all :param filter_: run "transform --help" to see command line interface explanation for detail :param kwargs: Example: csv_config = RJsonConfig("./result.csv", encoding="gbk") csv_getter = ProcessFactory.create_getter(csv_config) async for items in csv_getter: print(items) # both async generator and generator implemented for items in csv_getter: print(items) """ super().__init__() self.filename = filename self.mode = mode self.encoding = encoding self.per_limit = per_limit self.max_limit = max_limit self.filter = filter_ class RESConfig(BaseGetterConfig): def __init__(self, indices, doc_type, per_limit=DefaultVal.per_limit, max_limit=DefaultVal.max_limit, scroll="1m", query_body=None, return_source=True, max_retry=DefaultVal.max_retry, random_min_sleep=DefaultVal.random_min_sleep, random_max_sleep=DefaultVal.random_max_sleep, filter_=None, **kwargs): """ :param indices: elasticsearch indices :param doc_type: elasticsearch doc_type :param per_limit: how many items to get per request :param max_limit: get at most max_limit items, if not set, get all :param scroll: default is "1m" :param query_body: default is '{"size": "per_limit", "query": {"match_all": {}}}' :param return_source: if set to True, will return [item , ..., itemN], item is the "_source" object if set to False, will return whatever elasticsearch return, i.e {"hits": {"total": ...}} :param max_retry: if request fail, retry max_retry times :param random_min_sleep: if request fail, random sleep at least random_min_sleep seconds before request again :param random_max_sleep: if request fail, random sleep at most random_min_sleep seconds before request again :param filter_: run "transform --help" to see command line interface explanation for detail, only work if return_source is False :param kwargs: Example: body = { "size": 100, "_source": { "includes": ["likeCount", "id", "title"] } } es_config = RESConfig("post20170630", "news", max_limit=1000, query_body=body) es_getter = ProcessFactory.create_getter(es_config) async for items in es_getter: print(item) """ super().__init__() if not main_config.has_es_configured: raise ValueError("You must config es_hosts before using Elasticsearch, Please edit configure file: %s" % (main_config.ini_path, )) if not query_body: query_body = { "size": per_limit, "query": { "match_all": {} } } self.query_body = query_body self.indices = indices self.doc_type = doc_type self.per_limit = per_limit self.max_limit = max_limit self.scroll = scroll self.es_client = get_es_client() self.return_source = return_source self.max_retry = max_retry self.random_min_sleep = random_min_sleep self.random_max_sleep = random_max_sleep self.filter = filter_ class RJsonConfig(BaseGetterConfig): def __init__(self, filename, mode=DefaultVal.default_file_mode_r, encoding=DefaultVal.default_encoding, per_limit=DefaultVal.per_limit, max_limit=DefaultVal.max_limit, filter_=None, **kwargs): """ :param filename: line by line json file to read :param mode: file open mode, i.e "r" :param encoding: file encoding i.e "utf8" :param per_limit: how many items to get per time :param max_limit: get at most max_limit items, if not set, get all :param filter_: run "transform --help" to see command line interface explanation for detail :param kwargs: Example: json_config = RJsonConfig("./result.json") json_getter = ProcessFactory.create_getter(json_config) async for items in json_getter: print(items) # both async generator and generator implemented for items in json_getter: print(items) """ super().__init__() self.filename = filename self.mode = mode self.encoding = encoding self.per_limit = per_limit self.max_limit = max_limit self.filter = filter_ class RXLSXConfig(BaseGetterConfig): def __init__(self, filename, per_limit=DefaultVal.per_limit, max_limit=DefaultVal.max_limit, sheet_index=0, filter_=None, **kwargs): """ :param filename: filename to read :param per_limit: how many items to get per time :param max_limit: get at most max_limit items, if not set, get all :param sheet_index: which sheet to get, 0 means 0th sheet :param filter_: run "transform --help" to see command line interface explanation for detail :param kwargs: Example: xlsx_config = RXLSXConfig("./result.xlsx") xlsx_getter = ProcessFactory.create_getter(xlsx_config) async for items in xlsx_getter: print(items) # both async generator and generator implemented for items in xlsx_getter: print(items) """ super().__init__() self.filename = filename self.per_limit = per_limit self.max_limit = max_limit self.sheet_index = sheet_index self.filter = filter_ class RAPIBulkConfig(BaseGetterConfig): def __init__(self, sources, interval=DefaultVal.interval, concurrency=main_config["main"].getint("concurrency"), filter_=None, return_fail=False, **kwargs): """ :param sources: an iterable object, each item must be "url" or instance of RAPIConfig :param interval: integer or float, each time you call async generator, you will wait for "interval" seconds and get all items fetch during this "interval" :param concurrency: how many concurrency task run, default read from config file, if concurrency set, only string(url) in "sources" will work with this concurrency level, RAPIConfig instance won't :param filter_: run "transform --help" to see command line interface explanation for detail :param return_fail: if set to True, for each iteration, will return a tuple, api_getter = ProcessFactory.create_getter(RAPIBulkConfig([...])) async for items, bad_objects in getter: A = bad_objects[0] A.response: -> json object: '{"appCode": "weixinpro", "dataType": "post", "message": "param error", "retcode": "100005"}', if fail in request, response will be None A.tag: -> tag you pass to RAPIConfig A.source: -> source you pass to RAPIConfig :param kwargs: Example: sources = ["http://....", "http://....", "http://....", RAPIConfig("http://....")] bulk_config = RAPUBulkConfig(sources) bulk_getter = ProcessFactory.create_getter(bulk_config) async for items in bulk_getter: print(items) """ super().__init__() self.configs = (self.to_config(i) for i in sources) self.interval = interval self.concurrency = concurrency self.session = session_manger._generate_session(concurrency_limit=concurrency) self.filter = filter_ self.return_fail = return_fail def to_config(self, item): if isinstance(item, RAPIConfig): return item else: return RAPIConfig(item, session=self.session, filter_=self.filter, return_fail=self.return_fail) def __del__(self): if inspect.iscoroutinefunction(self.session.close): asyncio.ensure_future(self.session.close()) else: self.session.close() class RRedisConfig(BaseGetterConfig): def __init__(self, key, key_type="LIST", per_limit=DefaultVal.per_limit, max_limit=DefaultVal.max_limit, filter_=None, max_retry=DefaultVal.max_retry, random_min_sleep=DefaultVal.random_min_sleep, random_max_sleep=DefaultVal.random_max_sleep, host=main_config["redis"].get("host"), port=main_config["redis"].getint("port"), db=main_config["redis"].getint("db"), password=main_config["redis"].get("password"), timeout=main_config["redis"].getint("timeout"), encoding=main_config["redis"].get("encoding"), need_del=main_config["redis"].getboolean("need_del"), direction=main_config["redis"].get("direction"), compress=main_config["redis"].getboolean("compress"), **kwargs): """ :param key: redis key to get data :param key_type: redis data type to operate, current only support LIST, HASH :param per_limit: how many items to get per time :param max_limit: get at most max_limit items, if not set, get all :param max_retry: if request fail, retry max_retry times :param random_min_sleep: if request fail, random sleep at least random_min_sleep seconds before request again :param random_max_sleep: if request fail, random sleep at most random_min_sleep seconds before request again :param filter_: run "transform --help" to see command line interface explanation for detail :param host: redis host -> str :param port: redis port -> int :param db: redis database number -> int :param password: redis password -> int :param timeout: timeout per redis connection -> float :param encoding: redis object encoding -> str :param need_del: whether need to del the key after get object from redis -> boolean :param direction: "L" or "R", left to right or roght to left :param compress: whether compress data use zlib before write to redis -> boolean :param kwargs: Example: redis_config = RRedisConfig("my_key") redis_getter = ProcessFactory.create_getter(redis_config) async for items in redis_getter: print(items) """ super().__init__() if not main_config.has_redis_configured and port <= 0: raise ValueError("You must config redis before using Redis, Please edit configure file: %s" % (main_config.ini_path, )) if key_type not in ("LIST", "HASH"): raise ValueError("key_type must be one of (%s)" % (str(("LIST", "HASH")), )) if not encoding: raise ValueError("You must specific encoding, since I am going to load each object in json format, " "and treat it as dictionary in python") if not password: password = None self.redis_pool_cli = None self.key = key self.host = host self.port = port self.db = db self.password = password self.encoding = encoding self.timeout = timeout self.key_type = key_type self.per_limit = per_limit self.max_limit = max_limit self.filter = filter_ self.max_retry = max_retry self.random_min_sleep = random_min_sleep self.random_max_sleep = random_max_sleep self.need_del = need_del self.name = "%s_%s->%s" % (str(host), str(port), str(key)) self.redis_read_method = self.redis_len_method = self.redis_del_method = None self.direction = direction self.compress = compress if key_type == "LIST": self.is_range = True else: self.is_range = False async def get_redis_pool_cli(self): """ :return: an async redis client """ if self.redis_pool_cli is None: kwargs = { "db": self.db, "password": self.password, "encoding": self.encoding, "timeout": self.timeout, "minsize": 1, "maxsize": 3 } if self.compress: del kwargs["encoding"] self.redis_pool_cli = await aioredis.create_redis_pool((self.host, self.port), **kwargs) if self.key_type == "LIST": self.redis_read_method = self.redis_pool_cli.lrange self.redis_len_method = self.redis_pool_cli.llen self.redis_del_method = self.redis_pool_cli.ltrim else: self.redis_read_method = self.redis_pool_cli.hgetall self.redis_len_method = self.redis_pool_cli.hlen self.redis_del_method = self.redis_pool_cli.delete return self.redis_pool_cli class RMySQLConfig(BaseGetterConfig): def __init__(self, table, per_limit=DefaultVal.per_limit, max_limit=DefaultVal.max_limit, filter_=None, max_retry=DefaultVal.max_retry, random_min_sleep=DefaultVal.random_min_sleep, random_max_sleep=DefaultVal.random_max_sleep, host=main_config["mysql"].get("host"), port=main_config["mysql"].getint("port"), user=main_config["mysql"].get("user"), password=main_config["mysql"].get("password"), database=main_config["mysql"].get("database"), encoding=main_config["mysql"].get("encoding"), loop=None, **kwargs): """ :param table: mysql table :param per_limit: how many items to get per time :param max_limit: get at most max_limit items, if not set, get all :param filter_: run "transform --help" to see command line interface explanation for detail :param max_retry: if request fail, retry max_retry times :param random_min_sleep: if request fail, random sleep at least random_min_sleep seconds before request again :param random_max_sleep: if request fail, random sleep at most random_min_sleep seconds before request again :param host: mysql host -> str :param port: mysql port -> int :param user: mysql user -> str :param password: mysql password -> str :param database: mysql database -> str :param charset: default utf8 -> str :param loop: async loop instance :param kwargs: Example: mysql_config = RRedisConfig("my_table") redis_getter = ProcessFactory.create_getter(mysql_config) async for items in redis_getter: print(items) """ super().__init__() if not main_config.has_mysql_configured and port <= 0: raise ValueError("You must config mysql before using MySQL, Please edit configure file: %s" % (main_config.ini_path, )) if "aiomysql" not in globals(): raise ValueError("module mysql disabled, please reinstall " "requirements with python version higher than 3.5.3 to enable it") self.table = table self.database = database self.max_limit = max_limit self.per_limit = per_limit self.max_retry = max_retry self.random_min_sleep = random_min_sleep self.random_max_sleep = random_max_sleep self.filter = filter_ self.name = "%s->%s" % (self.table, self.database) self.host = host self.port = port self.user = user if not password: password = '' self.password = password self.database = database self.encoding = encoding if not loop: loop = asyncio.get_event_loop() self.loop = loop self.mysql_pool_cli = self.connection = self.cursor = None async def get_mysql_pool_cli(self): """ :return: an async mysql client """ if self.mysql_pool_cli is None: self.mysql_pool_cli = await aiomysql.create_pool(host=self.host, port=self.port, user=self.user, password=self.password, db=self.database, loop=self.loop, minsize=1, maxsize=3, charset=self.encoding) self.connection = await self.mysql_pool_cli.acquire() self.cursor = await self.connection.cursor() return self.mysql_pool_cli def free_resource(self): if self.mysql_pool_cli is not None: self.mysql_pool_cli.release(self.connection) self.mysql_pool_cli.close() self.loop.create_task(self.mysql_pool_cli.wait_closed()) self.mysql_pool_cli = self.connection = self.cursor = None class RMongoConfig(BaseGetterConfig): def __init__(self, collection, per_limit=DefaultVal.per_limit, max_limit=DefaultVal.max_limit, query_body=None, max_retry=DefaultVal.max_retry, random_min_sleep=DefaultVal.random_min_sleep, random_max_sleep=DefaultVal.random_max_sleep, filter_=None, host=main_config["mongo"].get("host"), port=main_config["mongo"].getint("port"), username=main_config["mongo"].get("username"), password=main_config["mongo"].get("password"), database=main_config["mongo"].get("database"), **kwargs): """ :param collection: collection name :param per_limit: how many items to get per request :param max_limit: get at most max_limit items, if not set, get all :param query_body: search query, default None, i.e: {'i': {'$lt': 5}} :param return_source: if set to True, will return [item , ..., itemN], item is the "_source" object if set to False, will return whatever elasticsearch return, i.e {"hits": {"total": ...}} :param max_retry: if request fail, retry max_retry times :param random_min_sleep: if request fail, random sleep at least random_min_sleep seconds before request again :param random_max_sleep: if request fail, random sleep at most random_min_sleep seconds before request again :param filter_: run "transform --help" to see command line interface explanation for detail :param kwargs: Example: mongo_config = RMongoConfig("my_coll") mongo_getter = ProcessFactory.create_getter(mongo_config) async for items in mongo_getter: print(item) """ super().__init__() if not main_config.has_mongo_configured: raise ValueError("You must config MongoDB before using MongoDB, Please edit configure file: %s" % (main_config.ini_path, )) if "motor" not in globals(): raise ValueError("module motor disabled, please reinstall " "requirements in linux") self.collection = collection self.query_body = query_body self.per_limit = per_limit self.max_limit = max_limit self.max_retry = max_retry self.random_min_sleep = random_min_sleep self.random_max_sleep = random_max_sleep self.filter = filter_ self.host = host self.port = port self.username = username self.password = password self.database = database self.name = "%s->%s" % (self.database, self.collection) self.client = self.cursor = None def get_mongo_cli(self): if self.client is None: kwargs = { "host": self.host, "port": self.port } if self.username: address = "mongodb://%s:%s@%s:%s/%s" % (self.username, self.password, kwargs["host"], str(kwargs["port"]), self.database) self.client = motor.motor_asyncio.AsyncIOMotorClient(address) else: self.client = motor.motor_asyncio.AsyncIOMotorClient(**kwargs) if self.query_body: self.cursor = self.client[self.database][self.collection].find(self.query_body) else: self.cursor = self.client[self.database][self.collection].find() return self.client PKRPL_%3LL@idataapi_transform/DataProcess/Config/ConfigUtil/WriterConfig.pyimport asyncio import aioredis import inspect try: import aiomysql except Exception as e: pass try: import motor.motor_asyncio except Exception as e: pass from .BaseConfig import BaseWriterConfig from ..ESConfig import get_es_client from ..DefaultValue import DefaultVal from ..ConnectorConfig import main_config class WCSVConfig(BaseWriterConfig): def __init__(self, filename, mode=DefaultVal.default_file_mode_w, encoding=DefaultVal.default_encoding, headers=None, filter_=None, expand=None, qsn=DefaultVal.qsn, **kwargs): """ :param filename: filename to write :param mode: file open mode, i.e "w" or "a+" :param encoding: file encoding i.e "utf8" :param headers: csv headers in first row, if not set, automatically extract in first bulk of items :param filter_: run "transform --help" to see command line interface explanation for detail :param expand: run "transform --help" to see command line interface explanation for detail :param qsn: run "transform --help" to see command line interface explanation for detail :param kwargs: Example: ... csv_config = WCSVConfig("./result.csv", encoding="utf8", headers=["likeCount", "id", "title"]) with ProcessFactory.create_writer(csv_config) as csv_writer: async for items in es_getter: # do whatever you want with items csv_writer.write(items) """ super().__init__() self.filename = filename self.encoding = encoding self.mode = mode self.headers = headers self.filter = filter_ self.expand = expand self.qsn = qsn class WESConfig(BaseWriterConfig): def __init__(self, indices, doc_type, filter_=None, expand=None, id_hash_func=DefaultVal.default_id_hash_func, appCode=None, actions=None, createDate=None, error_if_fail=True, timeout=None, max_retry=DefaultVal.max_retry, random_min_sleep=DefaultVal.random_min_sleep, random_max_sleep=DefaultVal.random_max_sleep, auto_insert_createDate=True, **kwargs): """ :param indices: elasticsearch indices :param doc_type: elasticsearch doc_type :param filter_: run "transform --help" to see command line interface explanation for detail :param expand: run "transform --help" to see command line interface explanation for detail :param id_hash_func: function to generate id_ for each item :param appCode: if not None, add appCode to each item before write to es :param actions: if not None, will set actions to user define actions, else default actions is 'index' :param createDate: if not None, add createDate to each item before write to es :param error_if_fail: if True, log to error if fail to insert to es, else log nothing :param timeout: http connection timeout when connect to es, seconds :param max_retry: if request fail, retry max_retry times :param random_min_sleep: if request fail, random sleep at least random_min_sleep seconds before request again :param random_max_sleep: if request fail, random sleep at most random_min_sleep seconds before request again :param auto_insert_createDate: whether insert createDate for each item automatic -> boolean :param kwargs: Example: ... es_config = WCSVConfig("post20170630", "news") with ProcessFactory.create_writer(es_config) as es_writer: # asyncio function must call with await await csv_writer.write(items) """ super().__init__() if not main_config.has_es_configured: raise ValueError("You must config es_hosts before using Elasticsearch, Please edit configure file: %s" % (main_config.ini_path, )) self.indices = indices self.doc_type = doc_type self.filter = filter_ self.expand = expand self.id_hash_func = id_hash_func self.es_client = get_es_client() self.app_code = appCode self.actions = actions self.create_date = createDate self.error_if_fail = error_if_fail self.timeout = timeout self.max_retry = max_retry self.random_min_sleep = random_min_sleep self.random_max_sleep = random_max_sleep self.auto_insert_createDate = auto_insert_createDate class WJsonConfig(BaseWriterConfig): def __init__(self, filename, mode=DefaultVal.default_file_mode_w, encoding=DefaultVal.default_encoding, expand=None, filter_=None, new_line=DefaultVal.new_line, **kwargs): """ :param filename: filename to write :param mode: file open mode, i.e "w" or "a+" :param encoding: file encoding i.e "utf8" :param expand: run "transform --help" to see command line interface explanation for detail :param filter_: run "transform --help" to see command line interface explanation for detail :param new_line: new_line seperator for each item, default is "\n" :param kwargs: Example: ... json_config = WJsonConfig("./result.json") with ProcessFactory.create_writer(csv_config) as json_writer: async for items in es_getter: json_writer.write(items) """ super().__init__() self.filename = filename self.mode = mode self.encoding = encoding self.expand = expand self.filter = filter_ self.new_line = new_line class WTXTConfig(BaseWriterConfig): def __init__(self, filename, mode=DefaultVal.default_file_mode_w, encoding=DefaultVal.default_encoding, expand=None, filter_=None, new_line=DefaultVal.new_line, join_val=DefaultVal.join_val, **kwargs): """ :param filename: filename to write :param mode: file open mode, i.e "w" or "a+" :param encoding: file encoding i.e "utf8" :param expand: run "transform --help" to see command line interface explanation for detail :param filter_: run "transform --help" to see command line interface explanation for detail :param new_line: new_line seperator for each item, default is "\n" :param join_val: space seperator for each key in each item, default is " " :param kwargs: Example: ... txt_config = WTXTConfig("./result.txt") with ProcessFactory.create_writer(txt_config) as txt_writer: async for items in es_getter: txt_writer.write(items) """ super().__init__() self.filename = filename self.mode = mode self.encoding = encoding self.expand = expand self.filter = filter_ self.new_line = new_line self.join_val = join_val class WXLSXConfig(BaseWriterConfig): def __init__(self, filename, title=DefaultVal.title, expand=None, filter_=None, **kwargs): """ :param filename: filename to write :param title: sheet title :param expand: run "transform --help" to see command line interface explanation for detail :param filter_: run "transform --help" to see command line interface explanation for detail :param kwargs: Example: ... xlsx_config = WXLSXConfig("./result.xlsx") with ProcessFactory.create_writer(xlsx_config) as xlsx_writer: async for items in es_getter: xlsx_writer.write(items) """ super().__init__() self.filename = filename self.title = title self.expand = expand self.filter = filter_ class WRedisConfig(BaseWriterConfig): def __init__(self, key, key_type="LIST", filter_=None, host=main_config["redis"].get("host"), port=main_config["redis"].getint("port"), db=main_config["redis"].getint("db"), password=main_config["redis"].get("password"), timeout=main_config["redis"].getint("timeout"), encoding=main_config["redis"].get("encoding"), direction=main_config["redis"].get("direction"), max_retry=DefaultVal.max_retry, random_min_sleep=DefaultVal.random_min_sleep, random_max_sleep=DefaultVal.random_max_sleep, compress=main_config["redis"].getboolean("compress"), **kwargs): """ :param key: redis key to write data :param key_type: redis data type to operate, current only support LIST, HASH :param filter_: run "transform --help" to see command line interface explanation for detail :param host: redis host -> str :param port: redis port -> int :param db: redis database number -> int :param password: redis password -> int :param timeout: timeout per redis connection -> float :param encoding: redis object encoding -> str :param direction: "L" or "R", lpush or rpush :param compress: whether compress data use zlib before write to redis -> boolean :param kwargs: Example: redis_config = WRedisConfig("my_key") with ProcessFactory.create_writer(redis_config) as redis_writer: async for items in es_getter: await redis_writer.write(items) """ super().__init__() if not main_config.has_redis_configured and port <= 0: raise ValueError("You must config redis before using Redis, Please edit configure file: %s" % (main_config.ini_path, )) if key_type not in ("LIST", "HASH"): raise ValueError("key_type must be one of (%s)" % (str(("LIST", )), )) if not encoding: raise ValueError("You must specific encoding, since I am going to load each object in json format, " "and treat it as dictionary in python") if not password: password = None self.redis_pool_cli = None self.key = key self.host = host self.port = port self.db = db self.password = password self.encoding = encoding self.timeout = timeout self.key_type = key_type self.filter = filter_ self.name = "%s_%s->%s" % (str(host), str(port), str(key)) self.redis_write_method = None self.direction = direction self.max_retry = max_retry self.random_min_sleep = random_min_sleep self.random_max_sleep = random_max_sleep self.compress = compress if key_type == "LIST": self.is_range = True else: self.is_range = False async def get_redis_pool_cli(self): """ :return: an async redis client """ if self.redis_pool_cli is None: kwargs = { "db": self.db, "password": self.password, "encoding": self.encoding, "timeout": self.timeout, "minsize": 1, "maxsize": 3 } if self.compress: del kwargs["encoding"] self.redis_pool_cli = await aioredis.create_redis_pool((self.host, self.port), **kwargs) if self.key_type == "LIST": if self.direction == "L": self.redis_write_method = self.redis_pool_cli.lpush else: self.redis_write_method = self.redis_pool_cli.rpush else: self.redis_write_method = self.redis_pool_cli.hset return self.redis_pool_cli class WMySQLConfig(BaseWriterConfig): def __init__(self, table, filter_=None, max_retry=DefaultVal.max_retry, random_min_sleep=DefaultVal.random_min_sleep, random_max_sleep=DefaultVal.random_max_sleep, host=main_config["mysql"].get("host"), port=main_config["mysql"].getint("port"), user=main_config["mysql"].get("user"), password=main_config["mysql"].get("password"), database=main_config["mysql"].get("database"), encoding=main_config["mysql"].get("encoding"), loop=None, **kwargs): """ :param table: mysql table :param filter_: run "transform --help" to see command line interface explanation for detail :param max_retry: if request fail, retry max_retry times :param random_min_sleep: if request fail, random sleep at least random_min_sleep seconds before request again :param random_max_sleep: if request fail, random sleep at most random_min_sleep seconds before request again :param host: mysql host -> str :param port: mysql port -> int :param user: mysql user -> str :param password: mysql password -> str :param database: mysql database -> str :param charset: default utf8 -> str :param loop: async loop instance :param kwargs: Example: mysql_config = WMySQLConfig("my_table") mysql_writer = ProcessFactory.create_writer(mysql_config) async for items in redis_getter: await mysql_writer.write(items) """ super().__init__() if not main_config.has_mysql_configured and port <= 0: raise ValueError("You must config mysql before using MySQL, Please edit configure file: %s" % (main_config.ini_path, )) if "aiomysql" not in globals(): raise ValueError("module mysql disabled, please reinstall " "requirements with python version higher than 3.5.3 to enable it") self.table = table self.database = database self.max_retry = max_retry self.random_min_sleep = random_min_sleep self.random_max_sleep = random_max_sleep self.filter = filter_ self.name = "%s->%s" % (self.table, self.database) self.host = host self.port = port self.user = user if not password: password = '' self.password = password self.database = database self.encoding = encoding if not loop: loop = asyncio.get_event_loop() self.loop = loop self.mysql_pool_cli = self.connection = self.cursor = None async def get_mysql_pool_cli(self): """ :return: an async mysql client """ if self.mysql_pool_cli is None: self.mysql_pool_cli = await aiomysql.create_pool(host=self.host, port=self.port, user=self.user, password=self.password, db=self.database, loop=self.loop, minsize=1, maxsize=3, charset=self.encoding) self.connection = await self.mysql_pool_cli.acquire() self.cursor = await self.connection.cursor() return self.mysql_pool_cli def free_resource(self): if self.mysql_pool_cli is not None: self.mysql_pool_cli.release(self.connection) self.mysql_pool_cli.close() self.loop.create_task(self.mysql_pool_cli.wait_closed()) self.mysql_pool_cli = self.connection = self.cursor = None class WMongoConfig(BaseWriterConfig): def __init__(self, collection, id_hash_func=DefaultVal.default_id_hash_func, per_limit=DefaultVal.per_limit, max_limit=DefaultVal.max_limit, query_body=None, max_retry=DefaultVal.max_retry, random_min_sleep=DefaultVal.random_min_sleep, random_max_sleep=DefaultVal.random_max_sleep, filter_=None, host=main_config["mongo"].get("host"), port=main_config["mongo"].getint("port"), username=main_config["mongo"].get("username"), password=main_config["mongo"].get("password"), database=main_config["mongo"].get("database"), auto_insert_createDate=False, createDate=None, **kwargs): """ :param collection: collection name :param id_hash_func: function to generate id_ for each item, only if "_id" not in item will I use 'id_hash_func' to generate "_id" :param per_limit: how many items to get per request :param max_limit: get at most max_limit items, if not set, get all :param query_body: search query, default None, i.e: {'i': {'$lt': 5}} :param return_source: if set to True, will return [item , ..., itemN], item is the "_source" object if set to False, will return whatever elasticsearch return, i.e {"hits": {"total": ...}} :param max_retry: if request fail, retry max_retry times :param random_min_sleep: if request fail, random sleep at least random_min_sleep seconds before request again :param random_max_sleep: if request fail, random sleep at most random_min_sleep seconds before request again :param filter_: run "transform --help" to see command line interface explanation for detail :param host: mongodb host -> str :param port: mongodb port -> int :param user: mongodb user -> str :param password: mongodb password -> str :param database: mongodb database -> str :param createDate: if not None, add createDate to each item before write to mongodb :param auto_insert_createDate: whether insert createDate for each item automatic -> boolean :param kwargs: Example: data = [json_obj, json_obj, json_obj] mongo_config = WMongoConfig("my_coll") async with ProcessFactory.create_writer(mongo_config) as mongo_writer: await mongo_writer.write(data) """ super().__init__() if not main_config.has_mongo_configured: raise ValueError("You must config MongoDB before using MongoDB, Please edit configure file: %s" % (main_config.ini_path, )) if "motor" not in globals(): raise ValueError("module motor disabled, please reinstall " "requirements in linux") self.collection = collection self.query_body = query_body self.per_limit = per_limit self.max_limit = max_limit self.max_retry = max_retry self.random_min_sleep = random_min_sleep self.random_max_sleep = random_max_sleep self.filter = filter_ self.host = host self.port = port self.username = username self.password = password self.database = database self.name = "%s->%s" % (self.database, self.collection) self.id_hash_func = id_hash_func self.auto_insert_createDate = auto_insert_createDate self.createDate = createDate self.client = self.collection_cli = None def get_mongo_cli(self): if self.client is None: kwargs = { "host": self.host, "port": self.port } if self.username: self.client = motor.motor_asyncio.AsyncIOMotorClient( "mongodb://%s:%s@%s:%s/%s" % (self.username, self.password, kwargs["host"], str(kwargs["port"]), self.database)) else: self.client = motor.motor_asyncio.AsyncIOMotorClient(**kwargs) self.collection_cli = self.client[self.database][self.collection] return self.client PK`K<idataapi_transform/DataProcess/Config/ConfigUtil/__init__.pyPKLsHuZM)M)6idataapi_transform/DataProcess/DataGetter/APIGetter.pyimport re import json import random import logging import asyncio import traceback from .BaseGetter import BaseGetter headers = { "Accept-Encoding": "gzip", "Connection": "close" } class SourceObject(object): def __init__(self, response, tag, source, error_url): """ When error occur :param response: error response body :param tag: tag user pass in :param source: source url user pass in :param error_url: current url elicit error """ self.response = response self.tag = tag self.source = source self.error_url = error_url class APIGetter(BaseGetter): def __init__(self, config): super().__init__() self.config = config self.base_url = self.config.source self.retry_count = 0 self.curr_size = 0 self.responses = list() self.bad_responses = list() self.done = False self.page_token = "" self.miss_count = 0 self.total_count = 0 def init_val(self): self.base_url = self.config.source self.retry_count = 0 self.curr_size = 0 self.responses = list() self.bad_responses = list() self.done = False self.page_token = "" self.miss_count = 0 self.total_count = 0 def generate_sub_func(self): def sub_func(match): return match.group(1) + self.page_token + match.group(3) return sub_func def update_base_url(self, key="pageToken"): if self.base_url[-1] == "/": self.base_url = self.base_url[:-1] elif self.base_url[-1] == "?": self.base_url = self.base_url[:-1] key += "=" if key not in self.base_url: if "?" not in self.base_url: self.base_url = self.base_url + "?" + key + self.page_token else: self.base_url = self.base_url + "&" + key + self.page_token else: self.base_url = re.sub("(" + key + ")(.+?)($|&)", self.generate_sub_func(), self.base_url) def __aiter__(self): return self async def __anext__(self): if self.done: logging.info("get source done: %s, total get %d items, total filtered: %d items" % (self.config.source, self.total_count, self.miss_count)) self.init_val() raise StopAsyncIteration while True: try: async with self.config.session.get(self.base_url, headers=headers) as resp: text = await resp.text() result = json.loads(text) if "data" not in result: if "retcode" not in result or result["retcode"] not in ("100002", "100301", "100103"): logging.error("retry: %d, %s: %s" % (self.retry_count, self.base_url, str(result))) if self.retry_count == self.config.max_retry: source_obj = SourceObject(result, self.config.tag, self.config.source, self.base_url) self.bad_responses.append(source_obj) except Exception as e: self.retry_count += 1 logging.error("retry: %d, %s: %s" % (self.retry_count, str(e), self.base_url)) await asyncio.sleep(random.randint(self.config.random_min_sleep, self.config.random_max_sleep)) if self.retry_count <= self.config.max_retry: continue else: # fail logging.error("Give up, After retry: %d times, Unable to get url: %s, total get %d items, " "total filtered: %d items, error: %s" % (self.base_url, self.config.max_retry, self.total_count, self.miss_count, str(traceback.format_exc()))) self.done = True if self.config.return_fail: self.bad_responses.append(SourceObject(None, self.config.tag, self.config.source, self.base_url)) return self.clear_and_return() elif self.responses: return self.clear_and_return() else: return await self.__anext__() if "data" in result: # success self.retry_count = 0 origin_length = len(result["data"]) self.total_count += origin_length if self.config.filter: curr_response = [self.config.filter(i) for i in result["data"]] curr_response = [i for i in curr_response if i] self.miss_count += origin_length - len(curr_response) else: curr_response = result["data"] self.responses.extend(curr_response) self.curr_size += len(curr_response) # get next page if success, retry if fail if "pageToken" in result: if not result["pageToken"]: self.done = True if self.need_return(): return self.clear_and_return() self.page_token = str(result["pageToken"]) self.update_base_url() elif "retcode" in result and result["retcode"] in ("100002", "100301", "100103"): self.done = True if self.need_return(): return self.clear_and_return() logging.info("get source done: %s, total get %d items, total filtered: %d items" % (self.config.source, self.total_count, self.miss_count)) return await self.__anext__() else: self.retry_count += 1 if self.retry_count > self.config.max_retry: logging.error("Give up, After retry: %d times, Unable to get url: %s, total get %d items, " "total filtered: %d items" % (self.config.max_retry, self.base_url, self.total_count, self.miss_count)) self.done = True if self.need_return(): return self.clear_and_return() await asyncio.sleep(random.randint(self.config.random_min_sleep, self.config.random_max_sleep)) return await self.__anext__() if self.config.max_limit and self.curr_size > self.config.max_limit: self.done = True return self.clear_and_return() elif len(self.responses) >= self.config.per_limit: return self.clear_and_return() elif self.done: # buffer has empty data, and done fetching return await self.__anext__() def __iter__(self): raise ValueError("APIGetter must be used with async generator, not normal generator") def clear_and_return(self): if self.config.return_fail: resp, bad_resp = self.responses, self.bad_responses self.responses, self.bad_responses = list(), list() return resp, bad_resp else: resp = self.responses self.responses = list() return resp def need_return(self): return self.responses or (self.config.return_fail and (self.responses or self.bad_responses)) class APIBulkGetter(BaseGetter): def __init__(self, config): super().__init__() self.config = config self.api_configs = self.to_generator(self.config.configs) self.pending_tasks = list() self.buffers = list() self.bad_buffers = list() self.success_task = 0 self.curr_size = 0 self.curr_bad_size = 0 @staticmethod def to_generator(items): for i in items: yield i async def fetch_items(self, api_config): if api_config.return_fail: async for items, bad_items in APIGetter(api_config): if self.config.return_fail: self.bad_buffers.extend(bad_items) self.buffers.extend(items) else: async for items in APIGetter(api_config): self.buffers.extend(items) def fill_tasks(self): if len(self.pending_tasks) >= self.config.concurrency: return for api_config in self.api_configs: self.pending_tasks.append(self.fetch_items(api_config)) if len(self.pending_tasks) >= self.config.concurrency: return def __aiter__(self): return self async def __anext__(self): self.fill_tasks() while self.pending_tasks: done, pending = await asyncio.wait(self.pending_tasks, timeout=self.config.interval) self.pending_tasks = list(pending) self.success_task += len(done) if self.buffers or (self.config.return_fail and (self.buffers or self.bad_buffers)): return self.clear_and_return() else: # after interval seconds, no item fetched self.fill_tasks() logging.info("After %.2f seconds, no new item fetched, current done task: %d, pending tasks: %d" % (float(self.config.interval), self.success_task, len(self.pending_tasks))) continue ret_log = "APIBulkGetter Done, total perform: %d tasks, fetch: %d items" % (self.success_task, self.curr_size) if self.config.return_fail: ret_log += " fail: %d items" % (self.curr_bad_size, ) logging.info(ret_log) raise StopAsyncIteration def __iter__(self): raise ValueError("APIBulkGetter must be used with async generator, not normal generator") def clear_and_return(self): if self.config.return_fail: buffers, bad_buffers = self.buffers, self.bad_buffers self.curr_size += len(self.buffers) self.curr_bad_size += len(self.bad_buffers) self.buffers, self.bad_buffers = list(), list() return buffers, bad_buffers else: buffers = self.buffers self.curr_size += len(self.buffers) self.buffers = list() return buffers PKYKUYUU7idataapi_transform/DataProcess/DataGetter/BaseGetter.pyimport abc from ..Meta.BaseDataProcess import BaseDataProcess class BaseGetter(BaseDataProcess, metaclass=abc.ABCMeta): @abc.abstractmethod def __init__(self, *args, **kwargs): """ :param config config contains attribute: source: where to read data per_limit: return at most per_limit data each time max_limit: return at most max_limit data total """ pass @abc.abstractmethod def __aiter__(self): return self @abc.abstractmethod async def __anext__(self): pass PKaLVz  6idataapi_transform/DataProcess/DataGetter/CSVGetter.pyimport csv import sys import logging from .BaseGetter import BaseGetter if sys.platform == "linux": csv.field_size_limit(sys.maxsize) class CSVGetter(BaseGetter): def __init__(self, config): super().__init__() self.config = config self.f_in = open(self.config.filename, self.config.mode, encoding=self.config.encoding) self.reader = csv.DictReader(self.f_in) self.done = False self.responses = list() self.miss_count = 0 self.total_count = 0 def init_val(self): self.done = False self.responses = list() self.f_in.seek(0, 0) self.miss_count = 0 self.total_count = 0 def __aiter__(self): return self async def __anext__(self): if self.done: logging.info("get source done: %s, total get %d items, total filtered: %d items" % (self.config.filename, self.total_count, self.miss_count)) self.init_val() raise StopAsyncIteration for row in self.reader: self.total_count += 1 if self.config.filter: row = self.config.filter(row) if not row: self.miss_count += 1 continue self.responses.append(row) if len(self.responses) > self.config.per_limit: return self.clear_and_return() if self.config.max_limit and len(self.responses) > self.config.max_limit: self.done = True return self.clear_and_return() if self.responses: self.done = True return self.clear_and_return() logging.info("get source done: %s, total get %d items, total filtered: %d items" % (self.config.filename, self.total_count, self.miss_count)) self.init_val() raise StopAsyncIteration def __iter__(self): for row in self.reader: self.total_count += 1 if self.config.filter: row = self.config.filter(row) if not row: self.miss_count += 1 continue self.responses.append(row) if len(self.responses) > self.config.per_limit: yield self.clear_and_return() if self.config.max_limit and len(self.responses) > self.config.max_limit: yield self.clear_and_return() break if self.responses: yield self.responses logging.info("get source done: %s, total get %d items, total filtered: %d items" % (self.config.filename, self.total_count, self.miss_count)) self.init_val() def clear_and_return(self): resp = self.responses self.responses = list() return resp PK]LTmm5idataapi_transform/DataProcess/DataGetter/ESGetter.pyimport asyncio import random import logging import traceback from .BaseGetter import BaseGetter class ESScrollGetter(BaseGetter): def __init__(self, config): super().__init__(self) self.config = config self.es_client = config.es_client self.total_size = None self.result = None self.scroll_id = None self.miss_count = 0 self.total_count = 0 def __aiter__(self): return self def init_val(self): self.total_size = None self.result = None self.scroll_id = None self.miss_count = 0 self.total_count = 0 async def __anext__(self, retry=1): if self.total_size is None: self.result = await self.es_client.search(self.config.indices, self.config.doc_type, scroll=self.config.scroll, body=self.config.query_body) self.total_size = self.result['hits']['total'] self.total_size = self.config.max_limit if (self.config.max_limit and self.config.max_limit < self.result['hits']['total']) else self.total_size self.total_count += len(self.result['hits']['hits']) logging.info("Get %d items from %s, percentage: %.2f%%" % (len(self.result['hits']['hits']), self.config.indices + "->" + self.config.doc_type, (self.total_count / self.total_size * 100) if self.total_size else 0)) origin_length = len(self.result['hits']['hits']) if self.config.return_source: results = [i["_source"] for i in self.result['hits']['hits']] else: results = self.result if self.config.filter: results = [self.config.filter(i) for i in results] results = [i for i in results if i] self.miss_count += origin_length - len(results) self.get_score_id_and_clear_result() return results if self.scroll_id and self.total_count < self.total_size: try: self.result = await self.es_client.scroll(scroll_id=self.scroll_id, scroll=self.config.scroll) except Exception as e: if retry < self.config.max_retry: logging.error("retry: %d, %s" % (retry, str(e))) await asyncio.sleep(random.randint(self.config.random_min_sleep, self.config.random_max_sleep)) return await self.__anext__(retry+1) else: logging.error("Give up es getter, After retry: %d times, still fail to get result: %s, " "total get %d items, total filtered: %d items, reason: %s" % (self.config.max_retry, self.config.indices + "->" + self.config.doc_type, self.total_count, self.miss_count, traceback.format_exc())) raise StopAsyncIteration logging.info("Get %d items from %s, filtered: %d items, percentage: %.2f%%" % (len(self.result['hits']['hits']), self.config.indices + "->" + self.config.doc_type, self.miss_count, (self.total_count / self.total_size * 100) if self.total_size else 0)) origin_length = len(self.result['hits']['hits']) self.total_count += origin_length if self.config.return_source: results = [i["_source"] for i in self.result['hits']['hits']] else: results = self.result if self.config.filter: results = [self.config.filter(i) for i in results] results = [i for i in results if i] self.miss_count += origin_length - len(results) self.get_score_id_and_clear_result() if origin_length > 0: return results else: # if scroll empty item, means no more next page logging.info("empty result, terminating scroll, scroll id: %s" % (str(self.scroll_id), )) logging.info("get source done: %s, total get %d items, total filtered: %d items" % (self.config.indices + "->" + self.config.doc_type, self.total_count, self.miss_count)) self.init_val() raise StopAsyncIteration async def delete_all(self): """ inefficient delete """ body = { "query": { "match_all": {} } } result = await self.config.es_client.delete_by_query(index=self.config.indices, doc_type=self.config.doc_type, body=body, params={"conflicts": "proceed"}) return result def __iter__(self): raise ValueError("ESGetter must be used with async generator, not normal generator") def get_score_id_and_clear_result(self): if "_scroll_id" in self.result and self.result["_scroll_id"]: self.scroll_id = self.result["_scroll_id"] else: self.scroll_id = None self.result = dict() PKbL¶hh h 7idataapi_transform/DataProcess/DataGetter/JsonGetter.pyimport json import logging from .BaseGetter import BaseGetter class JsonGetter(BaseGetter): def __init__(self, config): super().__init__(self) self.config = config self.responses = list() self.line_num = 0 self.done = False self.f_in = open(self.config.filename, self.config.mode, encoding=self.config.encoding) self.miss_count = 0 self.total_count = 0 def init_val(self): self.responses = list() self.line_num = 0 self.done = False self.f_in.seek(0, 0) self.miss_count = 0 self.total_count = 0 def __aiter__(self): return self async def __anext__(self): if self.done: logging.info("get source done: %s, total get %d items, total filtered: %d items" % (self.config.filename, self.total_count, self.miss_count)) self.init_val() raise StopAsyncIteration for line in self.f_in: self.line_num += 1 try: json_obj = json.loads(line) except json.decoder.JSONDecodeError: logging.error("JSONDecodeError. give up. line: %d" % (self.line_num, )) continue self.total_count += 1 if self.config.filter: json_obj = self.config.filter(json_obj) if not json_obj: self.miss_count += 1 continue self.responses.append(json_obj) if len(self.responses) > self.config.per_limit: return self.clear_and_return() if self.config.max_limit and len(self.responses) > self.config.max_limit: self.done = True return self.clear_and_return() self.done = True if self.responses: return self.clear_and_return() logging.info("get source done: %s, total get %d items, total filtered: %d items" % (self.config.filename, self.total_count, self.miss_count)) self.init_val() raise StopAsyncIteration def __iter__(self): for line in self.f_in: self.line_num += 1 try: json_obj = json.loads(line) except json.decoder.JSONDecodeError: logging.error("JSONDecodeError. give up. line: %d" % (self.line_num, )) continue self.total_count += 1 if self.config.filter: json_obj = self.config.filter(json_obj) if not json_obj: self.miss_count += 1 continue self.responses.append(json_obj) if len(self.responses) > self.config.per_limit: yield self.clear_and_return() if self.config.max_limit and len(self.responses) > self.config.max_limit: self.done = True yield self.clear_and_return() break if self.responses: yield self.clear_and_return() logging.info("get source done: %s, total get %d items, total filtered: %d items" % (self.config.filename, self.total_count, self.miss_count)) self.init_val() def __del__(self): self.f_in.close() def clear_and_return(self): resp = self.responses self.responses = list() return resp PK^Lm8idataapi_transform/DataProcess/DataGetter/MongoGetter.pyimport asyncio import traceback import random import logging from .BaseGetter import BaseGetter class MongoGetter(BaseGetter): def __init__(self, config): super().__init__(self) self.config = config self.responses = list() self.miss_count = 0 self.total_count = 0 self.total_size = None self.need_finish = False def init_val(self): self.responses = list() self.miss_count = 0 self.total_count = 0 self.total_size = None self.need_finish = False def __aiter__(self): return self async def __anext__(self): self.config.get_mongo_cli() # init mongo pool if self.need_finish: await self.finish() if self.total_size is None: self.total_size = await self.get_total_size() if self.total_count < self.total_size: await self.fetch_per_limit() return self.clear_and_return() # reach here, means done await self.finish() def __iter__(self): raise ValueError("MongoGetter must be used with async generator, not normal generator") async def finish(self): logging.info("get source done: %s, total get %d items, total filtered: %d items" % (self.config.name, self.total_count, self.miss_count)) self.init_val() raise StopAsyncIteration async def get_total_size(self): if hasattr(self.config.cursor, "count"): size = await self.config.cursor.count() else: size = await self.config.client[self.config.database][self.config.collection].count_documents({} if not self.config.query_body else self.config.query_body) size = min(size, self.config.max_limit if self.config.max_limit is not None else size) if size == 0: await self.finish() return size async def fetch_per_limit(self): curr_size = 0 try_time = 0 get_all = True while try_time < self.config.max_retry: try: async for document in self.config.cursor: curr_size += 1 self.responses.append(document) if curr_size >= self.config.per_limit: get_all = False break if get_all: # get all item if self.total_count + curr_size < self.total_size: logging.error("get all items: %d, but not reach 'total_size': %d" % (self.total_count + curr_size, self.total_size)) self.need_finish = True break except Exception as e: try_time += 1 if try_time < self.config.max_retry: logging.error("retry: %d, %s" % (try_time, str(e))) await asyncio.sleep(random.randint(self.config.random_min_sleep, self.config.random_max_sleep)) else: logging.error("Give up MongoGetter getter: %s, After retry: %d times, still fail, " "total get %d items, total filtered: %d items, reason: %s" % (self.config.name, self.config.max_retry, self.total_count, self.miss_count, str(traceback.format_exc()))) self.need_finish = True self.total_count += len(self.responses) curr_miss_count = 0 if self.config.filter: target_results = list() for each in self.responses: each = self.config.filter(each) if each: target_results.append(each) else: curr_miss_count += 1 self.responses = target_results self.miss_count += curr_miss_count logging.info("Get %d items from %s, filtered: %d items, percentage: %.2f%%" % (len(self.responses), self.config.name, curr_miss_count, (self.total_count / self.total_size * 100) if self.total_size else 0)) def clear_and_return(self): resp = self.responses self.responses = list() return resp PKPL&qq8idataapi_transform/DataProcess/DataGetter/MySQLGetter.pyimport json import asyncio import traceback import random import logging from .BaseGetter import BaseGetter class MySQLGetter(BaseGetter): def __init__(self, config): super().__init__(self) self.config = config self.responses = list() self.miss_count = 0 self.total_count = 0 self.total_size = None self.key_fields = list() self.key_fields_map = dict() self.need_finish = False def init_val(self): self.responses = list() self.miss_count = 0 self.total_count = 0 self.total_size = None self.key_fields = list() self.key_fields_map = dict() self.need_finish = False def __aiter__(self): return self async def __anext__(self): await self.config.get_mysql_pool_cli() # init mysql pool if self.need_finish: await self.finish() if self.total_size is None: self.total_size, self.key_fields = await self.get_total_size_and_key_field() if self.total_count < self.total_size: await self.fetch_per_limit() return self.clear_and_return() # reach here, means done await self.finish() def __iter__(self): raise ValueError("MySQLGetter must be used with async generator, not normal generator") async def finish(self): logging.info("get source done: %s, total get %d items, total filtered: %d items" % (self.config.name, self.total_count, self.miss_count)) self.init_val() self.config.free_resource() raise StopAsyncIteration async def get_total_size_and_key_field(self): await self.config.cursor.execute("DESC %s" % (self.config.table, )) result = await self.config.cursor.fetchall() field = result[0][0] await self.config.cursor.execute("select count(%s) from %s" % (field, self.config.table)) result = await self.config.cursor.fetchone() # key field await self.config.cursor.execute("DESC %s" % (self.config.table, )) results = await self.config.cursor.fetchall() key_fields = list() for each in results: key_fields.append(each[0]) if "tinyint" in each[1]: self.key_fields_map[each[0]] = bool elif "text" in each[1]: self.key_fields_map[each[0]] = str # or json key_fields = list(i[0] for i in results) return result[0], key_fields async def fetch_per_limit(self): results = list() try_time = 0 while try_time < self.config.max_retry: try: await self.config.cursor.execute("SELECT * FROM %s LIMIT %d,%d" % (self.config.table, self.total_count, self.total_count + self.config.per_limit)) results = await self.config.cursor.fetchall() break except Exception as e: try_time += 1 if try_time < self.config.max_retry: logging.error("retry: %d, %s" % (try_time, str(e))) await asyncio.sleep(random.randint(self.config.random_min_sleep, self.config.random_max_sleep)) else: logging.error("Give up MySQL getter: %s, After retry: %d times, still fail, " "total get %d items, total filtered: %d items, reason: %s" % (self.config.name, self.config.max_retry, self.total_count, self.miss_count, str(traceback.format_exc()))) self.need_finish = True self.responses = [self.decode(i) for i in results] curr_miss_count = 0 if self.config.filter: target_results = list() for each in results: each = self.config.filter(each) if each: target_results.append(each) else: curr_miss_count += 1 self.responses = target_results self.miss_count += curr_miss_count self.total_count += len(results) logging.info("Get %d items from %s, filtered: %d items, percentage: %.2f%%" % (len(results), self.config.name, curr_miss_count, (self.total_count / self.total_size * 100) if self.total_size else 0)) def decode(self, item): """ :param item: tuple :return: dict """ ret_dict = dict() index = 0 for key in self.key_fields: if key in self.key_fields_map: if self.key_fields_map[key] is bool: ret_dict[key] = bool(item[index]) elif item[index] is None: ret_dict[key] = None elif item[index][0] in ("{", "["): try: val = json.loads(item[index]) except json.decoder.JSONDecodeError: val = item[index] ret_dict[key] = val else: ret_dict[key] = item[index] else: ret_dict[key] = item[index] index += 1 return ret_dict def clear_and_return(self): resp = self.responses self.responses = list() return resp PK)cLvv!`8idataapi_transform/DataProcess/DataGetter/RedisGetter.pyimport asyncio import random import logging import traceback import json import zlib from .BaseGetter import BaseGetter class RedisGetter(BaseGetter): def __init__(self, config): super().__init__(self) self.config = config self.is_range = self.config.is_range self.need_del = self.config.need_del self.responses = list() self.done = False self.total_size = None self.miss_count = 0 self.total_count = 0 self.redis_object_length = 0 def init_val(self): self.responses = list() self.done = False self.miss_count = 0 self.total_count = 0 self.redis_object_length = 0 self.total_size = None def decode(self, loaded_object): if self.config.compress: return zlib.decompress(loaded_object).decode(self.config.encoding) else: return json.loads(loaded_object) def __aiter__(self): return self async def __anext__(self, retry=1): await self.config.get_redis_pool_cli() # init redis pool if self.is_range and self.total_size is None: self.redis_object_length = await self.config.redis_len_method(self.config.key) self.total_size = self.config.max_limit if (self.config.max_limit and self.config.max_limit < self.redis_object_length) else self.redis_object_length if self.done: logging.info("get source done: %s, total get %d items, total filtered: %d items" % (self.config.name, self.total_count, self.miss_count)) self.init_val() raise StopAsyncIteration if self.is_range: if self.config.direction == "L": left = self.total_count right = self.total_count + self.config.per_limit - 1 else: left = self.total_size - self.config.per_limit - 1 if left < 0: left = 0 right = left + self.config.per_limit try: self.responses = await self.config.redis_read_method(self.config.key, left, right) self.responses = [self.decode(i) for i in self.responses] except Exception as e: if retry < self.config.max_retry: logging.error("retry: %d, %s" % (retry, str(e))) await asyncio.sleep(random.randint(self.config.random_min_sleep, self.config.random_max_sleep)) return await self.__anext__(retry+1) else: logging.error("Give up redis getter, After retry: %d times, still fail to get key: %s, " "total get %d items, total filtered: %d items, error: %s" % (self.config.max_retry, self.config.key, self.total_count, self.miss_count, str(traceback.format_exc()))) raise StopAsyncIteration if len(self.responses) < self.config.per_limit or not self.responses or self.total_count + len(self.responses) >= self.total_size: self.done = True if self.need_del: await self.config.redis_del_method(self.config.key, 0, -1) else: try: self.responses = await self.config.redis_read_method(self.config.key) self.responses = [self.decode(i) for i in self.responses.values()][:self.total_size] except Exception as e: if retry < self.config.max_retry: logging.error("retry: %d, %s" % (retry, str(e))) await asyncio.sleep(random.randint(self.config.random_min_sleep, self.config.random_max_sleep)) return await self.__anext__(retry+1) else: logging.error("Give up redis getter, After retry: %d times, still fail to get key: %s, " "total get %d items, total filtered: %d items, reason: %s" % (self.config.max_retry, self.config.key, self.total_count, self.miss_count, str(traceback.format_exc()))) raise StopAsyncIteration if self.config.max_limit: self.responses = self.responses[:self.config.max_limit] self.done = True if self.need_del: await self.config.redis_del_method(self.config.key) current_response_length = len(self.responses) curr_miss_count = 0 self.total_count += current_response_length if self.config.filter: target_responses = list() for i in self.responses: if self.config.filter: i = self.config.filter(i) if i: target_responses.append(i) else: curr_miss_count += 1 self.responses = target_responses self.miss_count += curr_miss_count if self.is_range: logging.info("Get %d items from %s, filtered: %d items, percentage: %.2f%%" % (current_response_length, self.config.name, curr_miss_count, (self.total_count / self.total_size * 100) if self.total_size else 0)) return self.clear_and_return() def __iter__(self): raise ValueError("RedisGetter must be used with async generator, not normal generator") def clear_and_return(self): resp = self.responses self.responses = list() return resp PK)cLl<<7idataapi_transform/DataProcess/DataGetter/XLSXGetter.pyimport logging from openpyxl import load_workbook from .BaseGetter import BaseGetter class XLSXGetter(BaseGetter): def __init__(self, config): super().__init__() self.config = config self.wb = load_workbook(filename=self.config.filename, read_only=True) if not self.wb.worksheets: raise ValueError("Empty file: %s" % (self.config.filename, )) self.sheet = self.wb.worksheets[self.config.sheet_index] self.row_iter = self.sheet.rows self.headers = self.generate_headers() self.max_row = self.sheet.max_row if self.config.max_limit and self.config.max_limit > self.max_row: self.max_row = self.config.max_limit + 1 # add first headers self.row_num = 0 self.responses = list() self.curr_size = 0 self.done = False self.miss_count = 0 self.total_count = 0 def init_val(self): self.row_num = 0 self.responses = list() self.curr_size = 0 self.done = False self.miss_count = 0 self.total_count = 0 self.row_iter = self.sheet.rows def __aiter__(self): return self async def __anext__(self): if self.done: logging.info("get source done: %s, total get %d items, total filtered: %d items" % (self.config.filename, self.total_count, self.miss_count)) self.init_val() raise StopAsyncIteration while self.row_num < self.max_row: if self.row_num == 0: self.row_num += 1 continue self.row_num += 1 self.total_count += 1 row = self.get_next_row() if self.config.filter: row = self.config.filter(row) if not row: self.miss_count += 1 continue self.responses.append(row) if len(self.responses) > self.config.per_limit: self.curr_size += len(self.responses) return self.clear_and_return() if self.responses: self.done = True return self.clear_and_return() logging.info("get source done: %s, total get %d items, total filtered: %d items" % (self.config.filename, self.total_count, self.miss_count)) self.init_val() raise StopAsyncIteration def generate_headers(self): keys = list() try: row = next(self.row_iter) for each in row: keys.append(each.value) except StopIteration: pass return keys def get_next_row(self): ret_item = dict() r = next(self.row_iter) for key, cell in zip(self.headers, r): ret_item[key] = cell.value return ret_item def __iter__(self): for row_num in range(self.max_row): if row_num == 0: continue row_num += 1 self.total_count += 1 row = self.get_next_row() if self.config.filter: row = self.config.filter(row) if not row: self.miss_count += 1 continue self.responses.append(row) if len(self.responses) > self.config.per_limit: self.curr_size += len(self.responses) yield self.clear_and_return() if self.responses: yield self.responses logging.info("get source done: %s, total get %d items, total filtered: %d items" % (self.config.filename, self.total_count, self.miss_count)) self.init_val() def __del__(self): self.wb.close() def clear_and_return(self): resp = self.responses self.responses = list() return resp PK2K5idataapi_transform/DataProcess/DataGetter/__init__.pyPK{KV7idataapi_transform/DataProcess/DataWriter/BaseWriter.pyimport abc from ..Meta.BaseDataProcess import BaseDataProcess class BaseWriter(BaseDataProcess, metaclass=abc.ABCMeta): @abc.abstractmethod def __init__(self, *args, **kwargs): """ :param config """ pass @abc.abstractmethod async def write(self, responses): pass @abc.abstractmethod def __enter__(self): pass @abc.abstractmethod def __exit__(self, exc_type, exc_val, exc_tb): pass PKU+LO O 6idataapi_transform/DataProcess/DataWriter/CSVWriter.pyimport csv import types import logging from .BaseWriter import BaseWriter class CSVWriter(BaseWriter): def __init__(self, config): super().__init__() self.config = config self.f_out = open(self.config.filename, self.config.mode, encoding=self.config.encoding, newline="") self.f_csv = None self.headers = dict() if not self.config.headers else self.config.headers self.total_miss_count = 0 self.success_count = 0 def write(self, responses): miss_count = 0 # filter if self.config.filter: new_result = list() for each_response in responses: each_response = self.config.filter(each_response) if not each_response: miss_count += 1 continue new_result.append(each_response) responses = new_result self.total_miss_count += miss_count # all filtered if not responses: logging.info("%s write 0 item, filtered %d item" % (self.config.filename, miss_count)) return # expand if self.config.expand: responses = [self.expand_dict(i, max_expand=self.config.expand) for i in responses] else: responses = [i for i in responses] if isinstance(responses, types.GeneratorType) else responses # headers if not self.f_csv: if not self.headers: self.headers = self.generate_headers(responses) self.f_csv = csv.DictWriter(self.f_out, self.headers) self.f_csv.writeheader() # encoding process for each_response in responses: for k, v in each_response.items(): if v is None: each_response[k] = "" elif self.config.qsn and v != "" and (isinstance(v, (int, float)) or isinstance(v, str) and all(i.isdigit() for i in v)): each_response[k] = repr(str(v)) elif self.config.encoding not in ("utf8", "utf-8"): each_response[k] = str(v).encode(self.config.encoding, "ignore").decode(self.config.encoding) self.success_count += 1 self.f_csv.writerow(each_response) logging.info("%s write %d item, filtered %d item" % (self.config.filename, len(responses), miss_count)) @staticmethod def generate_headers(responses): headers = set() for r in responses: for key in r.keys(): headers.add(key) return list(headers) def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.f_out.close() logging.info("%s write done, total filtered %d item, total write %d item" % (self.config.filename, self.total_miss_count, self.success_count)) PK]Lы 5idataapi_transform/DataProcess/DataWriter/ESWriter.pyimport logging import asyncio import random from .BaseWriter import BaseWriter from ..Config.MainConfig import main_config class ESWriter(BaseWriter): def __init__(self, config): if not main_config.has_es_configured: raise ValueError("You must config es_hosts before using ESWriter, Please edit configure file: %s" % (main_config.ini_path, )) super().__init__() self.config = config self.total_miss_count = 0 self.success_count = 0 self.fail_count = 0 async def write(self, responses): response = None # something to return origin_length = len(responses) if self.config.filter: responses = [self.config.filter(i) for i in responses] responses = [i for i in responses if i] miss_count = origin_length - len(responses) self.total_miss_count += miss_count if responses: if self.config.expand: responses = [self.expand_dict(i) for i in responses] try_time = 0 while try_time < self.config.max_retry: success, fail, response = await self.config.es_client.add_dict_to_es( self.config.indices, self.config.doc_type, responses, self.config.id_hash_func, self.config.app_code, self.config.actions, self.config.create_date, self.config.error_if_fail, self.config.timeout, self.config.auto_insert_createDate) if response is not None: self.success_count += success self.fail_count += fail logging.info("Write %d items to index: %s, doc_type: %s, fail: %d, filtered: %d" % ( len(responses), self.config.indices, self.config.doc_type, fail, miss_count)) break else: # exception happened try_time += 1 if try_time >= self.config.max_retry: logging.error("Fail to write after try: %d times, Write 0 items to index: %s, doc_type: %s" % (self.config.max_retry, self.config.indices, self.config.doc_type)) else: await asyncio.sleep(random.randint(self.config.random_min_sleep, self.config.random_max_sleep)) else: # all filtered, or pass empty result logging.info("Write 0 items to index: %s, doc_type: %s (all filtered, or pass empty result)" % (self.config.indices, self.config.doc_type)) return response async def delete_all(self, body=None): """ inefficient delete """ if not body: body = { "query": { "match_all": {} } } result = await self.config.es_client.delete_by_query(index=self.config.indices, doc_type=self.config.doc_type, body=body, params={"conflicts": "proceed"}) return result def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): logging.info("%s->%s write done, total filtered %d item, total write %d item, total fail: %d item" % (self.config.indices, self.config.doc_type, self.total_miss_count, self.success_count, self.fail_count)) PKU+L ??7idataapi_transform/DataProcess/DataWriter/JsonWriter.pyimport json import logging from .BaseWriter import BaseWriter class JsonWriter(BaseWriter): def __init__(self, config): super().__init__() self.config = config self.total_miss_count = 0 self.success_count = 0 self.f_out = open(self.config.filename, self.config.mode, encoding=self.config.encoding) def write(self, responses): miss_count = 0 for each_response in responses: if self.config.expand: each_response = self.expand_dict(each_response, max_expand=self.config.expand) if self.config.filter: each_response = self.config.filter(each_response) if not each_response: miss_count += 1 continue self.f_out.write(json.dumps(each_response) + self.config.new_line) self.success_count += 1 self.total_miss_count += miss_count logging.info("%s write %d item, filtered %d item" % (self.config.filename, len(responses), miss_count)) def __exit__(self, exc_type, exc_val, exc_tb): self.f_out.close() logging.info("%s write done, total filtered %d item, total write %d item" % (self.config.filename, self.total_miss_count, self.success_count)) def __enter__(self): return self PKRPL"p9k k 8idataapi_transform/DataProcess/DataWriter/MongoWriter.pyimport json import asyncio import random import logging import traceback from .BaseWriter import BaseWriter InsertOne = DeleteMany = ReplaceOne = UpdateOne = None try: from pymongo import InsertOne, DeleteMany, ReplaceOne, UpdateOne except Exception: pass class MongoWriter(BaseWriter): def __init__(self, config): super().__init__() self.config = config self.total_miss_count = 0 self.success_count = 0 self.table_checked = False self.key_fields = list() async def write(self, responses): self.config.get_mongo_cli() # init mysql pool miss_count = 0 original_length = len(responses) if self.config.filter: target_responses = list() for i in responses: i = self.config.filter(i) if i: target_responses.append(i) else: miss_count += 1 responses = target_responses if not responses: self.finish_once(miss_count, original_length) return # After filtered, still have responses to write if await self.perform_write(responses): self.finish_once(miss_count, original_length) def __exit__(self, exc_type, exc_val, exc_tb): logging.info("%s write done, total filtered %d item, total write %d item" % (self.config.name, self.total_miss_count, self.success_count)) def __enter__(self): return self def finish_once(self, miss_count, original_length): self.total_miss_count += miss_count self.success_count += original_length logging.info("%s write %d item, filtered %d item" % (self.config.name, original_length - miss_count, miss_count)) async def perform_write(self, responses): try_time = 0 for each in responses: if self.config.auto_insert_createDate and self.config.createDate is not None: each["createDate"] = self.config.createDate if "_id" not in each: each["_id"] = self.config.id_hash_func(each) while try_time < self.config.max_retry: try: if UpdateOne is not None: await self.config.collection_cli.bulk_write([UpdateOne({'_id': each["_id"]}, {"$set": each}, upsert=True) for each in responses]) else: bulk = self.config.collection_cli.initialize_ordered_bulk_op() for each in responses: bulk.find({"_id": each["_id"]}).upsert().replace_one(each) await bulk.execute() return True except Exception as e: try_time += 1 if try_time < self.config.max_retry: logging.error("retry: %d, %s" % (try_time, str(e))) await asyncio.sleep(random.randint(self.config.random_min_sleep, self.config.random_max_sleep)) else: logging.error("Give up MongoWriter writer: %s, After retry: %d times, still fail to write, " "total write %d items, total filtered: %d items, reason: %s" % (self.config.name, self.config.max_retry, self.success_count, self.total_miss_count, str(traceback.format_exc()))) return False PKL5:r8idataapi_transform/DataProcess/DataWriter/MySQLWriter.pyimport json import asyncio import random import logging import traceback from .BaseWriter import BaseWriter class MySQLWriter(BaseWriter): def __init__(self, config): super().__init__() self.config = config self.total_miss_count = 0 self.success_count = 0 self.table_checked = False self.key_fields = list() async def write(self, responses): await self.config.get_mysql_pool_cli() # init mysql pool miss_count = 0 original_length = len(responses) if self.config.filter: target_responses = list() for i in responses: i = self.config.filter(i) if i: target_responses.append(i) else: miss_count += 1 responses = target_responses if not responses: self.finish_once(miss_count, original_length) return # After filtered, still have responses to write if not self.table_checked: await self.table_check(responses) if await self.perform_write(responses): self.finish_once(miss_count, original_length) def __exit__(self, exc_type, exc_val, exc_tb): self.config.free_resource() logging.info("%s write done, total filtered %d item, total write %d item" % (self.config.name, self.total_miss_count, self.success_count)) def __enter__(self): return self def finish_once(self, miss_count, original_length): self.total_miss_count += miss_count self.success_count += original_length logging.info("%s write %d item, filtered %d item" % (self.config.name, original_length - miss_count, miss_count)) async def table_check(self, responses): await self.config.cursor.execute("SHOW TABLES LIKE '%s'" % (self.config.table, )) result = await self.config.cursor.fetchone() if result is None: await self.create_table(responses) # check field await self.config.cursor.execute("DESC %s" % (self.config.table, )) results = await self.config.cursor.fetchall() fields = set(i[0] for i in results) self.key_fields = list(i[0] for i in results) real_keys = set(responses[0].keys()) difference_set = real_keys.difference(fields) if difference_set: # real keys not subset of fields raise ValueError("Field %s not in MySQL Table: %s" % (str(difference_set), self.config.table)) self.table_checked = True async def create_table(self, responses): test_response = dict() for response in responses[:50]: for k, v in response.items(): if k not in test_response: test_response[k] = v elif test_response[k] is None: test_response[k] = v elif isinstance(v, dict) or isinstance(v, list): if len(json.dumps(test_response[k])) < len(json.dumps(v)): test_response[k] = v elif v is not None and test_response[k] < v: test_response[k] = v sql = """ CREATE TABLE `%s` ( """ % (self.config.table, ) first_field = True for key, value in responses[0].items(): if "Count" in key: field_type = "BIGINT" elif value is None: field_type = "TEXT" elif key in ("content", ) or isinstance(value, dict) or isinstance(value, list): field_type = "TEXT" elif isinstance(value, bool): field_type = "BOOLEAN" elif isinstance(value, int): field_type = "BIGINT" elif isinstance(value, float): field_type = "DOUBLE" # varchar can store at most 65536 bytes, utf8 occupy 1-8 bytes per character, # so length should be less than 65536 / 8 = 8192 # assume this field (the shortest length) * 4 <= the longest length(8192) elif len(value) > 2048: field_type = "TEXT" else: length = len(value) * 4 if length < 256: length = 256 field_type = "VARCHAR(%d)" % (length, ) sql += ("\t" if first_field else "\t\t") + "`%s` %s" % (key, field_type) if key == "id": sql += " NOT NULL,\n" else: sql += ",\n" if first_field: first_field = False tail_sql = """ \tPRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=%s """ % (self.config.encoding, ) sql += tail_sql logging.info("Creating table: %s\n%s", self.config.table, sql) await self.config.cursor.execute(sql) await self.config.connection.commit() logging.info("table created") async def perform_write(self, responses): sql = "REPLACE INTO %s VALUES " % (self.config.table, ) for each in responses: curr_sql = '(' for field in self.key_fields: val = each[field] if isinstance(val, dict) or isinstance(val, list): val = json.dumps(val) if val is None: curr_sql += 'NULL,' else: curr_sql += repr(val) + "," curr_sql = curr_sql[:-1] + '),\n' sql += curr_sql sql = sql[:-2] try_time = 0 while try_time < self.config.max_retry: try: await self.config.cursor.execute(sql.encode(self.config.encoding)) await self.config.cursor.connection.commit() return True except Exception as e: try_time += 1 if try_time < self.config.max_retry: logging.error("retry: %d, %s" % (try_time, str(e))) await asyncio.sleep(random.randint(self.config.random_min_sleep, self.config.random_max_sleep)) else: logging.error("Give up MySQL writer: %s, After retry: %d times, still fail to write, " "total write %d items, total filtered: %d items, reason: %s" % (self.config.name, self.config.max_retry, self.success_count, self.total_miss_count, str(traceback.format_exc()))) return False PKL!a 8idataapi_transform/DataProcess/DataWriter/RedisWriter.pyimport logging import asyncio import random import traceback import json import zlib from .BaseWriter import BaseWriter class RedisWriter(BaseWriter): def __init__(self, config): super().__init__() self.config = config self.total_miss_count = 0 self.success_count = 0 def encode(self, dict_object): string = json.dumps(dict_object) if self.config.compress: string = zlib.compress(string.encode(self.config.encoding)) return string async def write(self, responses): await self.config.get_redis_pool_cli() # init redis pool miss_count = 0 target_responses = list() for each_response in responses: if self.config.filter: each_response = self.config.filter(each_response) if not each_response: miss_count += 1 continue target_responses.append(each_response) self.success_count += 1 self.total_miss_count += miss_count if target_responses: try_time = 0 while try_time < self.config.max_retry: try: if self.config.is_range: await self.config.redis_write_method(self.config.key, *(self.encode(i) for i in target_responses)) else: pipe_line = self.config.redis_pool_cli.pipeline() for each in responses: pipe_line.hset(self.config.key, each["id"], self.encode(each)) await pipe_line.execute() logging.info("%s write %d item, filtered %d item" % (self.config.name, len(responses), miss_count)) break except Exception as e: try_time += 1 if try_time >= self.config.max_retry: logging.error("Fail to write after try: %d times, Write 0 items to redis, " "filtered %d item before write, error: %s" % (self.config.max_retry, miss_count, str(traceback.format_exc()))) else: await asyncio.sleep(random.randint(self.config.random_min_sleep, self.config.random_max_sleep)) else: logging.info("Write 0 items to %s, filtered: %d, (all filtered, or pass empty result)" % (self.config.name, miss_count)) def __exit__(self, exc_type, exc_val, exc_tb): logging.info("%s write done, total filtered %d item, total write %d item" % (self.config.name, self.total_miss_count, self.success_count)) def __enter__(self): return self PKU+LA\UU6idataapi_transform/DataProcess/DataWriter/TXTWriter.pyimport logging from .BaseWriter import BaseWriter class TXTWriter(BaseWriter): def __init__(self, config): super().__init__() self.config = config self.f_out = open(config.filename, config.mode, encoding=config.encoding) self.total_miss_count = 0 self.success_count = 0 def write(self, responses): miss_count = 0 for each_response in responses: if self.config.expand: each_response = self.expand_dict(each_response, max_expand=self.config.expand) if self.config.filter: each_response = self.config.filter(each_response) if not each_response: miss_count += 1 continue self.f_out.write(self.config.join_val.join(str(value) for value in each_response.values()) + self.config.new_line) self.success_count += 1 self.total_miss_count += miss_count logging.info("%s write %d item, filtered %d item" % (self.config.filename, len(responses), miss_count)) def __exit__(self, exc_type, exc_val, exc_tb): self.f_out.close() logging.info("%s write done, total filtered %d item, total write %d item" % (self.config.filename, self.total_miss_count, self.success_count)) def __enter__(self): return self PKU+L97idataapi_transform/DataProcess/DataWriter/XLSXWriter.pyimport logging from openpyxl import Workbook from .BaseWriter import BaseWriter _warning = False class XLSXWriter(BaseWriter): def __init__(self, config): global _warning super().__init__() self.config = config self.wb = Workbook() self.ws1 = self.wb.active self.ws1.title = config.title self.col_dict = dict() self.row = 2 self.total_miss_count = 0 self.success_count = 0 if not _warning: logging.warning("XLSXWriter will actually write to file when __exit__ of XLSXWriter called") _warning = True def write(self, responses): miss_count = 0 for each_response in responses: if self.config.expand: each_response = self.expand_dict(each_response, max_expand=self.config.expand) if self.config.filter: each_response = self.config.filter(each_response) if not each_response: miss_count += 1 continue for key, value in each_response.items(): if key not in self.col_dict: self.col_dict[key] = len(self.col_dict) + 1 self.ws1.cell(row=1, column=self.col_dict[key], value=key) value = str(value) if value is not None else "" self.ws1.cell(row=self.row, column=self.col_dict[key], value=value) self.row += 1 self.success_count += 1 logging.info("%s write %d item, filtered %d item" % (self.config.filename, len(responses), miss_count)) def __exit__(self, exc_type, exc_val, exc_tb): self.wb.save(filename=self.config.filename) self.wb.close() logging.info("%s write done, total filtered %d item, total write %d item" % (self.config.filename, self.total_miss_count, self.success_count)) def __enter__(self): return self PKK5idataapi_transform/DataProcess/DataWriter/__init__.pyPKKB6idataapi_transform/DataProcess/Meta/BaseDataProcess.py class BaseDataProcess(object): @staticmethod def expand_dict(origin_item, max_expand=0, current_expand=0, parent_key=None, parent_item=None): if max_expand == 0: return origin_item if max_expand != -1 and current_expand >= max_expand: return origin_item if parent_key: if isinstance(origin_item, dict): for sub_k, sub_v in origin_item.items(): parent_item[parent_key + "_" + sub_k] = sub_v if parent_key in parent_item: del parent_item[parent_key] elif isinstance(origin_item, list): for item in origin_item: BaseDataProcess.expand_dict(item, max_expand, current_expand + 1, parent_key, parent_item) return origin_item keys = [k for k in origin_item.keys()] has_sub_dict = False for k in keys: if isinstance(origin_item[k], dict): has_sub_dict = True sub_dict = origin_item[k] for sub_k, sub_v in sub_dict.items(): origin_item[k + "_" + sub_k] = sub_v del origin_item[k] elif isinstance(origin_item[k], list): for item in origin_item[k]: BaseDataProcess.expand_dict(item, max_expand, current_expand + 1, k, origin_item) if has_sub_dict: return BaseDataProcess.expand_dict(origin_item, max_expand, current_expand + 1) else: return origin_item PKgK/idataapi_transform/DataProcess/Meta/__init__.pyPK!H8053idataapi_transform-1.3.9.dist-info/entry_points.txtN+I/N.,()*)J+N/ʵLI,IL,Ȍ Y&fqqPKK11*idataapi_transform-1.3.9.dist-info/LICENSEThe MIT License (MIT) Copyright (c) 2017 zpoint Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. PK!HNO(idataapi_transform-1.3.9.dist-info/WHEEL HM K-*ϳR03rOK-J,/RH,zd&Y)r$[)T&UrPK!H++idataapi_transform-1.3.9.dist-info/METADATAmn0E&PԪ,(>C2!#ŏ4|}mAX>s/(`Or0ʇ@E8rj_Ŀj{BK(ޑ P; р&4NaqJtx4OV3beq&`t4jpu\K>>y*Ĝq? g qygF M`o#O-}wg><>|$[ksRr=J|ΚMK?%0p ]d{]^ItTPK!HƳ%߳)idataapi_transform-1.3.9.dist-info/RECORDIXb_sDYdǪ0j"zNeK8+i1mM1n<@poB5]rI8$t/0ZJ~Х{Yc#A*֫Fu`ٝ0>>* -Uui%ǝLIiaF@Qا+nߨC% 'C9Y VK+6Lee),d'|7\ >ZWR{@$md:܉Cscޚj}m"{4{,???iTԳ+vK`07@P5&.s}"+W/wI`'+Iyyfl 6nF>r09>Cjn8.$E7Vq3$Љ;iG~kU6h({_|}P$V^BeeŠ zP81w)e1[@ "%E ؆7GMs ;޴QtGSihY(0hix{c1YE{S&JLLbMŁ,mrʦv= K)YαQYBwHʊ;$tqz, 5M CP|&+Eo xtÉOb' )])am`Cl ?ߞ,;yv8-NR3"pcHQm'wk N0O{@c6Vpu/vi{a&.ұюE(l`N8ZiZ 9eZ=u{vljGHC=RJ˴)d=w.lU;zWJ uqu%;8ʋ>I4rQ~4uһ7('On]X,!Rrˆ6wz)ksR=r_^1CoI+"wLM:*=(\ݩRrNy7,FcwvTOb\=YZӗOVN =i_zn6]1YrχjʖQCf?GM`Ѥ[LwmL6p0r7G@CdN>tV+I8^1s<\8m *,hV6+0錷"=KQD9Aм`akhf1.{ ,SF&2@~vIaNoBl.PNwjjBbW su)З"yoPK]L8CCidataapi_transform/__init__.pyPK}Lfe""idataapi_transform/cli.pyPK~Lɭ01 1 0A$idataapi_transform/DataProcess/ProcessFactory.pyPK狗K*/idataapi_transform/DataProcess/__init__.pyPK LlWw80idataapi_transform/DataProcess/Config/ConnectorConfig.pyPKxL <536idataapi_transform/DataProcess/Config/DefaultValue.pyPKLL%&%&1;idataapi_transform/DataProcess/Config/ESConfig.pyPK⌞K S2aidataapi_transform/DataProcess/Config/LogConfig.pyPK5Ld3iidataapi_transform/DataProcess/Config/MainConfig.pyPK VK1 |idataapi_transform/DataProcess/Config/__init__.pyPKZK@>Y|idataapi_transform/DataProcess/Config/ConfigUtil/BaseConfig.pyPK,^L{v\v\@}idataapi_transform/DataProcess/Config/ConfigUtil/GetterConfig.pyPKRPL_%3LL@idataapi_transform/DataProcess/Config/ConfigUtil/WriterConfig.pyPK`K<'idataapi_transform/DataProcess/Config/ConfigUtil/__init__.pyPKLsHuZM)M)6'idataapi_transform/DataProcess/DataGetter/APIGetter.pyPKYKUYUU7Qidataapi_transform/DataProcess/DataGetter/BaseGetter.pyPKaLVz  6.Tidataapi_transform/DataProcess/DataGetter/CSVGetter.pyPK]LTmm5_idataapi_transform/DataProcess/DataGetter/ESGetter.pyPKbL¶hh h 7Ytidataapi_transform/DataProcess/DataGetter/JsonGetter.pyPK^Lm8idataapi_transform/DataProcess/DataGetter/MongoGetter.pyPKPL&qq8idataapi_transform/DataProcess/DataGetter/MySQLGetter.pyPK)cLvv!`8idataapi_transform/DataProcess/DataGetter/RedisGetter.pyPK)cLl<<7پidataapi_transform/DataProcess/DataGetter/XLSXGetter.pyPK2K5jidataapi_transform/DataProcess/DataGetter/__init__.pyPK{KV7idataapi_transform/DataProcess/DataWriter/BaseWriter.pyPKU+LO O 6idataapi_transform/DataProcess/DataWriter/CSVWriter.pyPK]Lы 5idataapi_transform/DataProcess/DataWriter/ESWriter.pyPKU+L ??7bidataapi_transform/DataProcess/DataWriter/JsonWriter.pyPKRPL"p9k k 8idataapi_transform/DataProcess/DataWriter/MongoWriter.pyPKL5:r8idataapi_transform/DataProcess/DataWriter/MySQLWriter.pyPKL!a 8idataapi_transform/DataProcess/DataWriter/RedisWriter.pyPKU+LA\UU6"idataapi_transform/DataProcess/DataWriter/TXTWriter.pyPKU+L97~(idataapi_transform/DataProcess/DataWriter/XLSXWriter.pyPKK5y0idataapi_transform/DataProcess/DataWriter/__init__.pyPKKB60idataapi_transform/DataProcess/Meta/BaseDataProcess.pyPKgK/=7idataapi_transform/DataProcess/Meta/__init__.pyPK!H80537idataapi_transform-1.3.9.dist-info/entry_points.txtPKK11* 8idataapi_transform-1.3.9.dist-info/LICENSEPK!HNO(<idataapi_transform-1.3.9.dist-info/WHEELPK!H++=idataapi_transform-1.3.9.dist-info/METADATAPK!HƳ%߳)>idataapi_transform-1.3.9.dist-info/RECORDPK))E