PK'L8L3ridataapi_transform/__init__.py"""convert data from a format to another format, read or write from file or database, suitable for iDataAPI""" from .cli import main __version__ = '1.0.2' PK4W#L9"GGidataapi_transform/cli.pyimport json import asyncio import argparse from .DataProcess.Config.DefaultValue import DefaultVal from .DataProcess.Config.ConfigUtil import GetterConfig from .DataProcess.Config.ConfigUtil import WriterConfig from .DataProcess.ProcessFactory import ProcessFactory class Args(object): from_choices = ["API", "ES", "CSV", "XLSX", "JSON"] from_desc = "argument 'from' can only set to one of 'API', 'ES', 'CSV', 'XLSX', " \ "or 'JSON'(means json line by line file)" to_choices = ["csv", "xlsx", "json", "txt", "es"] to_desc = "argument 'to' can only set to one of \"csv\", \"xlsx\", \"json\", \"txt\" \"es\", \"json\", will write "\ "'json.dumps(item)' line by line. \"txt\" will write each item line by line, each element in each line " \ "is separated by 'space' bu default" source_desc = """ argument 'source', When argument '-from' set to 'ES', source should be 'index:doc_type' When argument 'from' set tp 'API', source should be 'http://... argument 'from' set to others, source should be file path """ dest_desc = "argument 'dest', filename to save result, no need for suffix, " \ "ie '/Desktop/result', default: './result'\n" \ "When argument '-to' set to 'ES', dest should be 'index:doc_type'" per_limit_desc = "amount of data buffered, when buffer filled, Program will write buffered data to 'dest', default 100" max_limit_desc = "write at most 'max_limit' data to 'dest', if 'max_limit' set to 0, means no limit, default to None" retry_desc = "when fetch data failed, retry at most 'retry' time, default 3" r_encoding_desc = "encoding of input file, ignore for xlsx format, default 'utf8'" w_encoding_desc = "encoding of output file, ignore for xlsx format, default 'utf8'" filter_desc = "file contains a 'my_filter(item)' function for filter" param_file_desc = """When you have many item save in id.json, --param_file './id.json::id::pid' means open './id.json ', read each json object line by line, use each_json['id'] as the parameter 'pid' and add it to the tail part of 'source'. --param_file can be either "filename.json::json_param::request_param" or "filename.txt::request_param" """ expand_desc = """If your item is {"a": {"b": "c"}, "b": "d"}, --expand 1 will make your item become {"a_b": "c", "b": "d"}, --expand N means expand at most N level deep of your object, --expand -1 means expand all level -- expand 0 means no expand of your item. Default 0. """ qsn_desc = """quote scientific notation, ie: 4324234234234234123123 will become 4.32423423423423E+021 in normal csv, If quote like '4324234234234234123123', it won't become scientific notation, Only work for output format 'csv' --qsn True means quote scientific notation, --qsn False means not quote scientific notation""" query_body_desc = """ElasticSearch query body, size has same function as "--limit", i.e: body = { "size": 100, "_source": { "includes": ["location", "title", "city", "id"] }, "query": { "bool": { "must": [ { "term": {"appCode": {"value": "ctrip"}} } ] } } } """ write_mode_desc = """'w' or 'a+'""" getter_config_map = { Args.from_choices[0]: GetterConfig.RAPIConfig, Args.from_choices[1]: GetterConfig.RESConfig, Args.from_choices[2]: GetterConfig.RCSVConfig, Args.from_choices[3]: GetterConfig.RXLSXConfig, Args.from_choices[4]: GetterConfig.RJsonConfig } writer_config_map = { Args.to_choices[0]: WriterConfig.WCSVConfig, Args.to_choices[1]: WriterConfig.WXLSXConfig, Args.to_choices[2]: WriterConfig.WJsonConfig, Args.to_choices[3]: WriterConfig.WJsonConfig, Args.to_choices[4]: WriterConfig.WESConfig } def get_arg(): parser = argparse.ArgumentParser(prog="idataapi_transform", description='convert data from a format to another format, ' 'read/write from file or database, suitable for iDataAPI') parser.add_argument("from", choices=Args.from_choices, help=Args.from_desc, type=str.upper) parser.add_argument("to", choices=Args.to_choices, help=Args.to_desc, type=str.lower) parser.add_argument("source", help=Args.source_desc) parser.add_argument("dest", help=Args.dest_desc, default=DefaultVal.dest, nargs="?") parser.add_argument("--per_limit", default=DefaultVal.per_limit, type=int, help=Args.per_limit_desc) parser.add_argument("--max_limit", default=DefaultVal.max_limit, type=int, help=Args.max_limit_desc) parser.add_argument("--max_retry", default=DefaultVal.max_retry, type=int, help=Args.retry_desc) parser.add_argument("--r_encoding", default=DefaultVal.default_encoding, help=Args.r_encoding_desc) parser.add_argument("--w_encoding", default=DefaultVal.default_encoding, help=Args.w_encoding_desc) parser.add_argument("--filter", default=None, help=Args.filter_desc) parser.add_argument("--expand", default=None, type=int, help=Args.expand_desc) parser.add_argument("--qsn", default=None, type=bool, help=Args.qsn_desc) parser.add_argument("--query_body", default=DefaultVal.query_body, type=str, help=Args.query_body_desc) parser.add_argument("--write_mode", default=DefaultVal.default_file_mode_w, type=str, help=Args.write_mode_desc) return parser.parse_args() def get_filter(filter_file): if not filter_file: return None with open(filter_file, "r") as f: exec(f.read()) func = locals()["my_filter"] return func async def getter_to_writer(getter, writer): with writer as safe_writer: async for items in getter: if asyncio.iscoroutinefunction(safe_writer.write): await safe_writer.write(items) else: safe_writer.write(items) def main(): args = get_arg() from_ = getattr(args, "from") from_args = list() from_kwargs = dict() to_args = list() to_kwargs = dict() if from_ != Args.from_choices[0]: # not api from_args.extend(args.source.split(":")) else: from_args.extend([args.source]) from_kwargs["encoding"] = args.r_encoding if args.query_body: try: from_kwargs["query_body"] = json.loads(args.query_body) except Exception as e: raise SyntaxError("--query_body must be json serialized") for key in ("per_limit", "max_limit", "max_retry"): from_kwargs[key] = getattr(args, key) to_kwargs["filter_"] = get_filter(args.filter) to_kwargs["encoding"] = args.w_encoding to_kwargs["mode"] = args.write_mode for key in ("max_retry", "expand", "qsn"): to_kwargs[key] = getattr(args, key) if from_ not in getter_config_map: raise ValueError("argument from must be in %s" % (str(Args.from_choices), )) getter_config = getter_config_map[from_](*from_args, **from_kwargs) getter = ProcessFactory.create_getter(getter_config) if args.to != Args.to_choices[-1]: dest = args.dest + "." + args.to to_args.append(dest) else: indices, doc_type = args.dest.split(":") to_args.append(indices) to_args.append(doc_type) writer_config = writer_config_map[args.to](*to_args, **to_kwargs) writer = ProcessFactory.create_writer(writer_config) loop = asyncio.get_event_loop() loop.run_until_complete(getter_to_writer(getter, writer)) if __name__ == "__main__": main() PKn-Lr}s> > 0idataapi_transform/DataProcess/ProcessFactory.py# config from .Config.MainConfig import main_config from .Config.ConfigUtil import GetterConfig from .Config.ConfigUtil import WriterConfig from .DataGetter.ESGetter import ESScrollGetter from .DataGetter.CSVGetter import CSVGetter from .DataGetter.APIGetter import APIGetter, APIBulkGetter from .DataGetter.JsonGetter import JsonGetter from .DataGetter.XLSXGetter import XLSXGetter from .DataWriter.CSVWriter import CSVWriter from .DataWriter.ESWriter import ESWriter from .DataWriter.JsonWriter import JsonWriter from .DataWriter.TXTWriter import TXTWriter from .DataWriter.XLSXWriter import XLSXWriter class ProcessFactory(object): @staticmethod def create_getter(config): """ create a getter based on config :return: getter """ if isinstance(config, GetterConfig.RAPIConfig): return APIGetter(config) elif isinstance(config, GetterConfig.RCSVConfig): return CSVGetter(config) elif isinstance(config, GetterConfig.RESConfig): return ESScrollGetter(config) elif isinstance(config, GetterConfig.RJsonConfig): return JsonGetter(config) elif isinstance(config, GetterConfig.RXLSXConfig): return XLSXGetter(config) elif isinstance(config, GetterConfig.RAPIBulkConfig): return APIBulkGetter(config) else: raise ValueError("create_getter must pass one of the instance of [RAPIConfig, RCSVConfig, " "RESConfig, RJsonConfig, RXLSXConfig, RAPIBulkConfig]") @staticmethod def create_writer(config): """ create a writer based on config :return: a writer """ if isinstance(config, WriterConfig.WCSVConfig): return CSVWriter(config) elif isinstance(config, WriterConfig.WESConfig): return ESWriter(config) elif isinstance(config, WriterConfig.WJsonConfig): return JsonWriter(config) elif isinstance(config, WriterConfig.WTXTConfig): return TXTWriter(config) elif isinstance(config, WriterConfig.WXLSXConfig): return XLSXWriter(config) else: raise ValueError("create_writer must pass one of the instance of [WCSVConfig, WESConfig, WJsonConfig, " "WTXTConfig, WXLSXConfig]") PK狗K*idataapi_transform/DataProcess/__init__.pyPKmUKN]8idataapi_transform/DataProcess/Config/ConnectorConfig.pyimport aiohttp import asyncio from .MainConfig import main_config main_config = main_config() default_concurrency_limit = main_config["main"].getint("concurrency") class _SessionManger(object): def __init__(self, concurrency_limit=default_concurrency_limit, loop=None): self.session = self._generate_session(concurrency_limit=concurrency_limit, loop=loop) @staticmethod def _generate_connector(limit=default_concurrency_limit, loop=None): """ https://github.com/KeepSafe/aiohttp/issues/883 if connector is passed to session, it is not available anymore """ if not loop: loop = asyncio.get_event_loop() return aiohttp.TCPConnector(limit=limit, loop=loop) @staticmethod def _generate_session(concurrency_limit=default_concurrency_limit, loop=None): if not loop: loop = asyncio.get_event_loop() return aiohttp.ClientSession(connector=_SessionManger._generate_connector(limit=concurrency_limit, loop=loop), loop=loop) def get_session(self): return self.session def __del__(self): self.session.close() session_manger = _SessionManger() PKxO$L\ws5idataapi_transform/DataProcess/Config/DefaultValue.pyimport os from .MainConfig import main_config base_config = main_config() class DefaultVal(object): per_limit = base_config["main"].getint("per_limit") max_limit = base_config["main"].get("max_limit") if max_limit != "None": max_limit = int(max_limit) else: max_limit = None max_retry = base_config["main"].getint("max_retry") random_min_sleep = base_config["main"].getint("random_min_sleep") random_max_sleep = base_config["main"].getint("random_max_sleep") default_file_mode_r = "r" default_file_mode_w = "w" default_encoding = "utf8" new_line = "\n" join_val = " " title = "example" qsn = None query_body = None dest = os.getcwd() + "/result" interval = 5 concurrency = 50 PKms9LjE""1idataapi_transform/DataProcess/Config/ESConfig.pyimport asyncio import aiohttp import hashlib import json import time import logging import copy from elasticsearch_async.connection import AIOHttpConnection as OriginAIOHttpConnection from elasticsearch_async.transport import AsyncTransport as OriginAsyncTransport from elasticsearch_async.transport import ensure_future from elasticsearch import TransportError from aiohttp.client_exceptions import ServerFingerprintMismatch from elasticsearch.exceptions import ConnectionError, ConnectionTimeout, SSLError from elasticsearch.compat import urlencode from elasticsearch_async import AsyncTransport from elasticsearch_async import AsyncElasticsearch es_hosts = None def init_es(hosts, es_headers, timeout_): global es_hosts, AsyncElasticsearch, AsyncTransport es_hosts = hosts if not es_hosts: return False class MyAsyncTransport(OriginAsyncTransport): """ Override default AsyncTransport to add timeout """ def perform_request(self, method, url, params=None, body=None, timeout=None, headers=None): if body is not None: body = self.serializer.dumps(body) # some clients or environments don't support sending GET with body if method in ('HEAD', 'GET') and self.send_get_body_as != 'GET': # send it as post instead if self.send_get_body_as == 'POST': method = 'POST' # or as source parameter elif self.send_get_body_as == 'source': if params is None: params = {} params['source'] = body body = None if body is not None: try: body = body.encode('utf-8') except (UnicodeDecodeError, AttributeError): # bytes/str - no need to re-encode pass ignore = () if params: ignore = params.pop('ignore', ()) if isinstance(ignore, int): ignore = (ignore,) return ensure_future(self.main_loop(method, url, params, body, ignore=ignore, timeout=timeout, headers=headers), loop=self.loop) @asyncio.coroutine def main_loop(self, method, url, params, body, ignore=(), timeout=None, headers=None): for attempt in range(self.max_retries + 1): connection = self.get_connection() try: status, headers, data = yield from connection.perform_request( method, url, params, body, ignore=ignore, timeout=timeout, headers=headers) except TransportError as e: if method == 'HEAD' and e.status_code == 404: return False retry = False if isinstance(e, ConnectionTimeout): retry = self.retry_on_timeout elif isinstance(e, ConnectionError): retry = True elif e.status_code in self.retry_on_status: retry = True if retry: # only mark as dead if we are retrying self.mark_dead(connection) # raise exception on last retry if attempt == self.max_retries: raise else: raise else: if method == 'HEAD': return 200 <= status < 300 # connection didn't fail, confirm it's live status self.connection_pool.mark_live(connection) if data: data = self.deserializer.loads(data, headers.get('content-type')) return data class AIOHttpConnection(OriginAIOHttpConnection): """ Override default AIOHttpConnection.perform_request to add headers """ @asyncio.coroutine def perform_request(self, method, url, params=None, body=None, timeout=None, ignore=(), headers=None): url_path = url if params: url_path = '%s?%s' % (url, urlencode(params or {})) url = self.base_url + url_path start = self.loop.time() response = None local_headers = es_headers if headers: local_headers = copy.deepcopy(es_headers) if es_headers else dict() local_headers.update(headers) try: with aiohttp.Timeout(timeout or timeout_ or self.timeout): response = yield from self.session.request(method, url, data=body, headers=local_headers) raw_data = yield from response.text() duration = self.loop.time() - start except Exception as e: self.log_request_fail(method, url, url_path, body, self.loop.time() - start, exception=e) if isinstance(e, ServerFingerprintMismatch): raise SSLError('N/A', str(e), e) if isinstance(e, asyncio.TimeoutError): raise ConnectionTimeout('TIMEOUT', str(e), e) raise ConnectionError('N/A', str(e), e) finally: if response is not None: yield from response.release() # raise errors based on http status codes, let the client handle those if needed if not (200 <= response.status < 300) and response.status not in ignore: self.log_request_fail(method, url, url_path, body, duration, status_code=response.status, response=raw_data) self._raise_error(response.status, raw_data) self.log_request_success(method, url, url_path, body, response.status, raw_data, duration) return response.status, response.headers, raw_data class MyAsyncElasticsearch(AsyncElasticsearch): @staticmethod def default_id_hash_func(item): if "appCode" in item and item["appCode"] and "id" in item and item["id"]: value = (item["appCode"] + "_" + item["id"]).encode("utf8") else: value = str(item).encode("utf8") if "createDate" not in item: item["createDate"] = int(time.time()) return hashlib.md5(value).hexdigest() async def add_dict_to_es(self, indices, doc_type, items, id_hash_func=None, app_code=None, actions=None, create_date=None, error_if_fail=True, timeout=None): if not actions: actions = "index" if not id_hash_func: id_hash_func = self.default_id_hash_func body = "" for item in items: if app_code: item["appCode"] = app_code if create_date: item["createDate"] = create_date action = { actions: { "_index": indices, "_type": doc_type, "_id": id_hash_func(item) } } body += json.dumps(action) + "\n" + json.dumps(item) + "\n" try: success = fail = 0 r = await self.transport.perform_request("POST", "/_bulk?pretty", body=body, timeout=timeout) if r["errors"]: for item in r["items"]: for k, v in item.items(): if "error" in v: if error_if_fail: # log error logging.error(json.dumps(v["error"])) fail += 1 else: success += 1 else: success = len(r["items"]) return success, fail, r except Exception as e: import traceback logging.error(traceback.format_exc()) logging.error("elasticsearch Exception, give up: %s" % (str(e), )) return None, None, None OriginAIOHttpConnection.perform_request = AIOHttpConnection.perform_request OriginAsyncTransport.perform_request = MyAsyncTransport.perform_request OriginAsyncTransport.main_loop = MyAsyncTransport.main_loop AsyncElasticsearch = MyAsyncElasticsearch return True def get_es_client(): return AsyncElasticsearch(hosts=es_hosts) PK⌞K S2idataapi_transform/DataProcess/Config/LogConfig.pyimport os import logging from logging.handlers import RotatingFileHandler format_str = "%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s" date_formatter_str = '[%Y-%m-%d %H:%M:%S]' formatter = logging.Formatter(format_str, datefmt=date_formatter_str) class SingleLevelFilter(logging.Filter): def __init__(self, passlevel, reject): super(SingleLevelFilter, self).__init__() self.passlevel = passlevel self.reject = reject def filter(self, record): if self.reject: return record.levelno != self.passlevel else: return record.levelno == self.passlevel def init_log(log_dir, max_log_file_bytes, ini_path): root_logger = logging.getLogger() root_logger.setLevel(logging.INFO) # console console = logging.StreamHandler() console.setFormatter(formatter) root_logger.addHandler(console) if log_dir: if not os.path.exists(log_dir): logging.error("log_dir(%s) in configure file(%s) not exists, I will not log to file" % (log_dir, ini_path)) return False if not max_log_file_bytes: logging.error("log_byte not set, please configure log_byte in configure file(%s), " "or I will not log to file" % (ini_path, )) return False # info h1 = RotatingFileHandler("%s/info.log" % (log_dir, ), mode="a", maxBytes=max_log_file_bytes, encoding="utf8", backupCount=1) h1.setFormatter(formatter) f1 = SingleLevelFilter(logging.INFO, False) h1.addFilter(f1) root_logger.addHandler(h1) # error h1 = RotatingFileHandler("%s/error.log" % (log_dir, ), mode="a", maxBytes=max_log_file_bytes, encoding="utf8", backupCount=1) h1.setFormatter(formatter) f1 = SingleLevelFilter(logging.ERROR, False) h1.addFilter(f1) root_logger.addHandler(h1) return True return False PK5c#Leܒ 3idataapi_transform/DataProcess/Config/MainConfig.pyimport os import json import configparser from os.path import expanduser from .LogConfig import init_log from .ESConfig import init_es default_configure_content = """ [main] # default max concurrency value for APIGetter concurrency = 50 # buffer size per_limit = 100 # fetch at most max_limit items max_limit = None # max retry for getter before give up if fail to get data max_retry = 3 # sleep interval if fail random_min_sleep = 1 random_max_sleep = 3 [es] # elasticsearch host # hosts = ["localhost:9393"] # elasticsearch headers when perform http request # headers = {"Host": "localhost", "value": "value"} # request timeout, seconds # timeout = 10 [log] # a directory to save log file # path = /Users/zpoint/Desktop/idataapi-transform/logs/ # max byte per log file # log_byte = 5242880 """ class MainConfig(object): def __init__(self, ini_path=None): # singleton if not hasattr(self, "__instance"): self.ini_path = ini_path if not self.ini_path: home = expanduser("~") self.ini_path = home + "/idataapi-transform.ini" if not os.path.exists(self.ini_path): with open(self.ini_path, "w") as f: f.write(default_configure_content) self.__instance = configparser.ConfigParser() self.__instance.read(self.ini_path) MainConfig.__instance = self.__instance self.has_log_file = self.config_log() self.has_es_configured = self.config_es() def __call__(self): return self.__instance def config_log(self): max_log_file_bytes = self.__instance["log"].getint("log_byte") log_path = self.__instance["log"].get("path") return init_log(log_path, max_log_file_bytes, self.ini_path) def config_es(self): hosts = self.__instance["es"].get("hosts") timeout = self.__instance["es"].getint("timeout") if hosts: try: hosts = json.loads(hosts) except Exception as e: raise ValueError("es host must be json serialized") headers = self.__instance["es"].get("headers") if headers and headers != "None": try: headers = json.loads(headers) except Exception as e: raise ValueError("es headers must be json serialized") else: headers = None return init_es(hosts, headers, timeout) main_config = MainConfig() PK VK1idataapi_transform/DataProcess/Config/__init__.pyPKZK@>idataapi_transform/DataProcess/Config/ConfigUtil/BaseConfig.pyimport abc class BaseGetterConfig(object, metaclass=abc.ABCMeta): @abc.abstractmethod def __init__(self, *args, **kwargs): pass class BaseWriterConfig(object, metaclass=abc.ABCMeta): @abc.abstractmethod def __init__(self, *args, **kwargs): pass PK1LGR;&;&@idataapi_transform/DataProcess/Config/ConfigUtil/GetterConfig.pyfrom .BaseConfig import BaseGetterConfig from ..ESConfig import get_es_client from ..DefaultValue import DefaultVal from ..ConnectorConfig import session_manger, main_config class RAPIConfig(BaseGetterConfig): def __init__(self, source, per_limit=DefaultVal.per_limit, max_limit=DefaultVal.max_limit, max_retry=DefaultVal.max_retry, random_min_sleep=DefaultVal.random_min_sleep, random_max_sleep=DefaultVal.random_max_sleep, session=None, filter_=None, *args, **kwargs): """ will request until no more next_page to get, or get "max_limit" items :param source: API to get, i.e. "http://..." :param per_limit: how many items to get per time :param max_limit: get at most max_limit items, if not set, get all :param max_retry: if request fail, retry max_retry times :param random_min_sleep: if request fail, random sleep at least random_min_sleep seconds before request again :param random_max_sleep: if request fail, random sleep at most random_min_sleep seconds before request again :param session: aiohttp session to perform request :param filter_: run "transform --help" to see command line interface explanation for detail :param args: :param kwargs: Example: api_config = RAPIConfig("http://...") api_getter = ProcessFactory.create_getter(api_config) async for items in api_getter: print(items) """ super().__init__() self.source = source self.per_limit = per_limit self.max_limit = max_limit self.max_retry = max_retry self.random_min_sleep = random_min_sleep self.random_max_sleep = random_max_sleep self.session = session_manger.get_session() if not session else session self.filter = filter_ class RCSVConfig(BaseGetterConfig): def __init__(self, filename, mode=DefaultVal.default_file_mode_r, encoding=DefaultVal.default_encoding, per_limit=DefaultVal.per_limit, max_limit=DefaultVal.max_limit, filter_=None, **kwargs): """ :param filename: filename to read :param mode: file open mode, i.e "r" :param encoding: file encoding i.e "utf8" :param per_limit: how many items to get per time :param max_limit: get at most max_limit items, if not set, get all :param filter_: run "transform --help" to see command line interface explanation for detail :param kwargs: Example: csv_config = RJsonConfig("./result.csv", encoding="gbk") csv_getter = ProcessFactory.create_getter(csv_config) async for items in csv_getter: print(items) # both async generator and generator implemented for items in csv_getter: print(items) """ super().__init__() self.filename = filename self.mode = mode self.encoding = encoding self.per_limit = per_limit self.max_limit = max_limit self.filter = filter_ class RESConfig(BaseGetterConfig): def __init__(self, indices, doc_type, per_limit=DefaultVal.per_limit, max_limit=DefaultVal.max_limit, scroll="1m", query_body=None, return_source=True, max_retry=DefaultVal.max_retry, random_min_sleep=DefaultVal.random_min_sleep, random_max_sleep=DefaultVal.random_max_sleep, filter_=None, **kwargs): """ :param indices: elasticsearch indices :param doc_type: elasticsearch doc_type :param per_limit: how many items to get per request :param max_limit: get at most max_limit items, if not set, get all :param scroll: default is "1m" :param query_body: default is '{"size": "per_limit", "query": {"match_all": {}}}' :param return_source: if set to True, will return [item , ..., itemN], item is the "_source" object if set to False, will return whatever elasticsearch return, i.e {"hits": {"total": ...}} :param max_retry: if request fail, retry max_retry times :param random_min_sleep: if request fail, random sleep at least random_min_sleep seconds before request again :param random_max_sleep: if request fail, random sleep at most random_min_sleep seconds before request again :param filter_: run "transform --help" to see command line interface explanation for detail, only work if return_source is False :param kwargs: Example: body = { "size": 100, "_source": { "includes": ["likeCount", "id", "title"] } } es_config = RESConfig("post20170630", "news", max_limit=1000, query_body=body) es_getter = ProcessFactory.create_getter(es_config) async for items in es_getter: print(item) """ super().__init__() if not query_body: query_body = { "size": per_limit, "query": { "match_all": {} } } self.query_body = query_body self.indices = indices self.doc_type = doc_type self.per_limit = per_limit self.max_limit = max_limit self.scroll = scroll self.es_client = get_es_client() self.return_source = return_source self.max_retry = max_retry self.random_min_sleep = random_min_sleep self.random_max_sleep = random_max_sleep self.filter = filter_ def __del__(self): self.es_client.transport.close() class RJsonConfig(BaseGetterConfig): def __init__(self, filename, mode=DefaultVal.default_file_mode_r, encoding=DefaultVal.default_encoding, per_limit=DefaultVal.per_limit, max_limit=DefaultVal.max_limit, filter_=None, **kwargs): """ :param filename: line by line json file to read :param mode: file open mode, i.e "r" :param encoding: file encoding i.e "utf8" :param per_limit: how many items to get per time :param max_limit: get at most max_limit items, if not set, get all :param filter_: run "transform --help" to see command line interface explanation for detail :param kwargs: Example: json_config = RJsonConfig("./result.json") json_getter = ProcessFactory.create_getter(json_config) async for items in json_getter: print(items) # both async generator and generator implemented for items in json_getter: print(items) """ super().__init__() self.filename = filename self.mode = mode self.encoding = encoding self.per_limit = per_limit self.max_limit = max_limit self.filter = filter_ class RXLSXConfig(BaseGetterConfig): def __init__(self, filename, per_limit=DefaultVal.per_limit, max_limit=DefaultVal.max_limit, sheet_index=0, filter_=None, **kwargs): """ :param filename: filename to read :param per_limit: how many items to get per time :param max_limit: get at most max_limit items, if not set, get all :param sheet_index: which sheet to get, 0 means 0th sheet :param filter_: run "transform --help" to see command line interface explanation for detail :param kwargs: Example: xlsx_config = RXLSXConfig("./result.xlsx") xlsx_getter = ProcessFactory.create_getter(xlsx_config) async for items in xlsx_getter: print(items) # both async generator and generator implemented for items in xlsx_getter: print(items) """ super().__init__() self.filename = filename self.per_limit = per_limit self.max_limit = max_limit self.sheet_index = sheet_index self.filter = filter_ class RAPIBulkConfig(BaseGetterConfig): def __init__(self, sources, interval=DefaultVal.interval, concurrency=main_config["main"].getint("concurrency"), filter_=None, **kwargs): """ :param sources: an iterable object, each item must be "url" or instance of RAPIConfig :param interval: integer or float, each time you call async generator, you will wait for "interval" seconds and get all items fetch during this "interval" :param concurrency: how many concurrency task run, default read from config file, if concurrency set, only string(url) in "sources" will work with this concurrency level, RAPIConfig instance won't :param filter_: run "transform --help" to see command line interface explanation for detail :param kwargs: Example: sources = ["http://....", "http://....", "http://....", RAPIConfig("http://....")] bulk_config = RAPUBulkConfig(sources) bulk_getter = ProcessFactory.create_getter(bulk_config) async for items in bulk_getter: print(items) """ super().__init__() self.configs = (self.to_config(i) for i in sources) self.interval = interval self.concurrency = concurrency self.session = session_manger._generate_session(concurrency_limit=concurrency) self.filter = filter_ def to_config(self, item): if isinstance(item, RAPIConfig): return item else: return RAPIConfig(item, session=self.session, filter_=self.filter) def __del__(self): self.session.close() PKu,Lg""@idataapi_transform/DataProcess/Config/ConfigUtil/WriterConfig.pyfrom .BaseConfig import BaseWriterConfig from ..ESConfig import get_es_client from ..DefaultValue import DefaultVal class WCSVConfig(BaseWriterConfig): def __init__(self, filename, mode=DefaultVal.default_file_mode_w, encoding=DefaultVal.default_encoding, headers=None, filter_=None, expand=None, qsn=DefaultVal.qsn, **kwargs): """ :param filename: filename to write :param mode: file open mode, i.e "w" or "a+" :param encoding: file encoding i.e "utf8" :param headers: csv headers in first row, if not set, automatically extract in first bulk of items :param filter_: run "transform --help" to see command line interface explanation for detail :param expand: run "transform --help" to see command line interface explanation for detail :param qsn: run "transform --help" to see command line interface explanation for detail :param kwargs: Example: ... csv_config = WCSVConfig("./result.csv", encoding="utf8", headers=["likeCount", "id", "title"]) with ProcessFactory.create_writer(csv_config) as csv_writer: async for items in es_getter: # do whatever you want with items csv_writer.write(items) """ super().__init__() self.filename = filename self.encoding = encoding self.mode = mode self.headers = headers self.filter = filter_ self.expand = expand self.qsn = qsn class WESConfig(BaseWriterConfig): def __init__(self, indices, doc_type, filter_=None, expand=None, id_hash_func=None, appCode=None, actions=None, createDate=None, error_if_fail=True, timeout=None, **kwargs): """ :param indices: elasticsearch indices :param doc_type: elasticsearch doc_type :param filter_: run "transform --help" to see command line interface explanation for detail :param expand: run "transform --help" to see command line interface explanation for detail :param id_hash_func: function to generate id_ for each item :param appCode: if not None, add appCode to each item before write to es :param actions: if not None, will set actions to user define actions, else default actions is 'index' :param appCode: if not None, add createDate to each item before write to es :param error_if_fail: if True, log to error if fail to insert to es, else log nothing :param timeout: http connection timeout when connect to es, seconds :param kwargs: Example: ... es_config = WCSVConfig("post20170630", "news") with ProcessFactory.create_writer(es_config) as es_writer: # asyncio function must call with await await csv_writer.write(items) """ super().__init__() self.indices = indices self.doc_type = doc_type self.filter = filter_ self.expand = expand self.id_hash_func = id_hash_func self.es_client = get_es_client() self.app_code = appCode self.actions = actions self.create_date = createDate self.error_if_fail = error_if_fail self.timeout = timeout def __del__(self): self.es_client.transport.close() class WJsonConfig(BaseWriterConfig): def __init__(self, filename, mode=DefaultVal.default_file_mode_w, encoding=DefaultVal.default_encoding, expand=None, filter_=None, new_line=DefaultVal.new_line, **kwargs): """ :param filename: filename to write :param mode: file open mode, i.e "w" or "a+" :param encoding: file encoding i.e "utf8" :param expand: run "transform --help" to see command line interface explanation for detail :param filter_: run "transform --help" to see command line interface explanation for detail :param new_line: new_line seperator for each item, default is "\n" :param kwargs: Example: ... json_config = WJsonConfig("./result.json") with ProcessFactory.create_writer(csv_config) as json_writer: async for items in es_getter: json_writer.write(items) """ super().__init__() self.filename = filename self.mode = mode self.encoding = encoding self.expand = expand self.filter = filter_ self.new_line = new_line class WTXTConfig(BaseWriterConfig): def __init__(self, filename, mode=DefaultVal.default_file_mode_w, encoding=DefaultVal.default_encoding, expand=None, filter_=None, new_line=DefaultVal.new_line, join_val=DefaultVal.join_val, **kwargs): """ :param filename: filename to write :param mode: file open mode, i.e "w" or "a+" :param encoding: file encoding i.e "utf8" :param expand: run "transform --help" to see command line interface explanation for detail :param filter_: run "transform --help" to see command line interface explanation for detail :param new_line: new_line seperator for each item, default is "\n" :param join_val: space seperator for each key in each item, default is " " :param kwargs: Example: ... txt_config = WTXTConfig("./result.txt") with ProcessFactory.create_writer(txt_config) as txt_writer: async for items in es_getter: txt_writer.write(items) """ super().__init__() self.filename = filename self.mode = mode self.encoding = encoding self.expand = expand self.filter = filter_ self.new_line = new_line self.join_val = join_val class WXLSXConfig(BaseWriterConfig): def __init__(self, filename, title=DefaultVal.title, expand=None, filter_=None, **kwargs): """ :param filename: filename to write :param title: sheet title :param expand: run "transform --help" to see command line interface explanation for detail :param filter_: run "transform --help" to see command line interface explanation for detail :param kwargs: Example: ... xlsx_config = WXLSXConfig("./result.xlsx") with ProcessFactory.create_writer(xlsx_config) as xlsx_writer: async for items in es_getter: xlsx_writer.write(items) """ super().__init__() self.filename = filename self.title = title self.expand = expand self.filter = filter_ PK`K<idataapi_transform/DataProcess/Config/ConfigUtil/__init__.pyPK1L<66idataapi_transform/DataProcess/DataGetter/APIGetter.pyimport re import json import random import logging import asyncio from .BaseGetter import BaseGetter headers = { "Accept-Encoding": "gzip", "Connection": "close" } class APIGetter(BaseGetter): def __init__(self, config): super().__init__() self.config = config self.base_url = self.config.source self.retry_count = 0 self.curr_size = 0 self.responses = list() self.done = False self.need_clear = False self.page_token = "" self.miss_count = 0 self.total_count = 0 def init_val(self): self.base_url = self.config.source self.retry_count = 0 self.curr_size = 0 self.responses = list() self.done = False self.need_clear = False self.page_token = "" self.miss_count = 0 self.total_count = 0 def generate_sub_func(self): def sub_func(match): return match.group(1) + self.page_token + match.group(3) return sub_func def update_base_url(self, key="pageToken"): if self.base_url[-1] == "/": self.base_url = self.base_url[:-1] elif self.base_url[-1] == "?": self.base_url = self.base_url[:-1] key += "=" if key not in self.base_url: if "?" not in self.base_url: self.base_url = self.base_url + "?" + key + self.page_token else: self.base_url = self.base_url + "&" + key + self.page_token else: self.base_url = re.sub("(" + key + ")(.+?)($|&)", self.generate_sub_func(), self.base_url) def __aiter__(self): return self async def __anext__(self): if self.need_clear and self.responses: self.responses.clear() self.need_clear = False if self.done: logging.info("get source done: %s, total get %d items, total filtered: %d items" % (self.config.source, self.total_count, self.miss_count)) self.init_val() raise StopAsyncIteration while True: try: async with self.config.session.get(self.base_url, headers=headers) as resp: text = await resp.text() result = json.loads(text) except Exception as e: logging.error("%s: %s" % (str(e), self.base_url)) await asyncio.sleep(random.randint(self.config.random_min_sleep, self.config.random_max_sleep)) self.retry_count += 1 if self.retry_count <= self.config.max_retry: continue else: # fail logging.error("Give up, Unable to get url: %s, total get %d items, total filtered: %d items" % (self.base_url, self.total_count, self.miss_count)) self.done = True if self.responses: self.need_clear = True return self.responses else: return await self.__anext__() if "data" in result: # success self.retry_count = 0 origin_length = len(result["data"]) self.total_count += origin_length if self.config.filter: curr_response = [self.config.filter(i) for i in result["data"]] curr_response = [i for i in curr_response if i] self.miss_count += origin_length - len(curr_response) else: curr_response = result["data"] self.responses.extend(curr_response) self.curr_size += len(curr_response) # get next page if success, retry if fail if "pageToken" in result: if not result["pageToken"]: self.done = True if self.responses: self.need_clear = True return self.responses self.page_token = str(result["pageToken"]) self.update_base_url() elif "retcode" in result and result["retcode"] == "100002": self.done = True if self.responses: self.need_clear = True return self.responses logging.info("get source done: %s, total get %d items, total filtered: %d items" % (self.config.source, self.total_count, self.miss_count)) return await self.__anext__() else: if self.retry_count >= self.config.max_retry: logging.error("Give up, Unable to get url: %s, total get %d items, total filtered: %d items" % (self.base_url, self.total_count, self.miss_count)) self.done = True if self.responses: self.need_clear = True return self.responses self.retry_count += 1 await asyncio.sleep(random.randint(self.config.random_min_sleep, self.config.random_max_sleep)) return await self.__anext__() if self.config.max_limit and self.curr_size > self.config.max_limit: self.done = self.need_clear = True return self.responses elif len(self.responses) >= self.config.per_limit: self.need_clear = True return self.responses elif self.done: # buffer has empty data, and done fetching return await self.__anext__() def __iter__(self): raise ValueError("APIGetter must be used with async generator, not normal generator") class APIBulkGetter(BaseGetter): def __init__(self, config): super().__init__() self.config = config self.api_configs = self.to_generator(self.config.configs) self.pending_tasks = list() self.buffers = list() self.success_task = 0 self.curr_size = 0 self.need_clear = False @staticmethod def to_generator(items): for i in items: yield i async def fetch_items(self, api_config): async for items in APIGetter(api_config): self.buffers.extend(items) def fill_tasks(self): if len(self.pending_tasks) >= self.config.concurrency: return for api_config in self.api_configs: self.pending_tasks.append(self.fetch_items(api_config)) if len(self.pending_tasks) >= self.config.concurrency: return def __aiter__(self): return self async def __anext__(self): if self.need_clear: self.buffers.clear() self.fill_tasks() while self.pending_tasks: done, pending = await asyncio.wait(self.pending_tasks, timeout=self.config.interval) self.pending_tasks = list(pending) self.success_task += len(done) if self.buffers: self.need_clear = True self.curr_size += len(self.buffers) return self.buffers else: # after interval seconds, no item fetched self.fill_tasks() logging.info("After %.2f seconds, no new item fetched, current done task: %d, pending tasks: %d" % (float(self.config.interval), self.success_task, len(self.pending_tasks))) continue logging.info("APIBulkGetter Done, total perform: %d tasks. fetch: %d items" % (self.success_task, self.curr_size)) raise StopAsyncIteration def __iter__(self): raise ValueError("APIBulkGetter must be used with async generator, not normal generator") PKYKUYUU7idataapi_transform/DataProcess/DataGetter/BaseGetter.pyimport abc from ..Meta.BaseDataProcess import BaseDataProcess class BaseGetter(BaseDataProcess, metaclass=abc.ABCMeta): @abc.abstractmethod def __init__(self, *args, **kwargs): """ :param config config contains attribute: source: where to read data per_limit: return at most per_limit data each time max_limit: return at most max_limit data total """ pass @abc.abstractmethod def __aiter__(self): return self @abc.abstractmethod async def __anext__(self): pass PK<-L&hL L 6idataapi_transform/DataProcess/DataGetter/CSVGetter.pyimport csv import sys import logging from .BaseGetter import BaseGetter if sys.platform == "linux": csv.field_size_limit(sys.maxsize) class CSVGetter(BaseGetter): def __init__(self, config): super().__init__() self.config = config self.f_in = open(self.config.filename, self.config.mode, encoding=self.config.encoding) self.reader = csv.DictReader(self.f_in) self.done = False self.need_clear = False self.curr_size = 0 self.responses = list() self.miss_count = 0 self.total_count = 0 def init_val(self): self.done = False self.need_clear = False self.curr_size = 0 self.responses = list() self.f_in.seek(0, 0) self.miss_count = 0 self.total_count = 0 def __aiter__(self): return self async def __anext__(self): if self.need_clear: self.responses.clear() self.need_clear = False if self.done: logging.info("get source done: %s, total get %d items, total filtered: %d items" % (self.config.filename, self.total_count, self.miss_count)) self.init_val() raise StopAsyncIteration for row in self.reader: self.total_count += 1 if self.config.filter: row = self.config.filter(row) if not row: self.miss_count += 1 continue self.responses.append(row) if len(self.responses) > self.config.per_limit: self.curr_size += len(self.responses) self.need_clear = True return self.responses if self.config.max_limit and self.curr_size > self.config.max_limit: self.done = self.need_clear = True return self.responses if self.responses: self.done = self.need_clear = True return self.responses logging.info("get source done: %s, total get %d items, total filtered: %d items" % (self.config.filename, self.total_count, self.miss_count)) self.init_val() raise StopAsyncIteration def __iter__(self): for row in self.reader: self.total_count += 1 if self.config.filter: row = self.config.filter(row) if not row: self.miss_count += 1 continue self.responses.append(row) if len(self.responses) > self.config.per_limit: self.curr_size += len(self.responses) yield self.responses self.responses.clear() if self.config.max_limit and self.curr_size > self.config.max_limit: yield self.responses self.responses.clear() break if self.responses: yield self.responses logging.info("get source done: %s, total get %d items, total filtered: %d items" % (self.config.filename, self.total_count, self.miss_count)) self.init_val() PK8L5idataapi_transform/DataProcess/DataGetter/ESGetter.pyimport asyncio import random import logging from .BaseGetter import BaseGetter from ..Config.MainConfig import main_config class ESScrollGetter(BaseGetter): def __init__(self, config): if not main_config.has_es_configured: raise ValueError("You must config es_hosts before using ESGetter, Please edit configure file: %s" % (main_config.ini_path, )) super().__init__(self) self.config = config self.es_client = config.es_client self.total_size = None self.result = None self.curr_size = 0 self.miss_count = 0 self.total_count = 0 def __aiter__(self): return self def init_val(self): self.total_size = None self.result = None self.curr_size = 0 self.miss_count = 0 self.total_count = 0 async def __anext__(self, retry=1): if self.total_size is None: self.result = await self.es_client.search(self.config.indices, self.config.doc_type, scroll=self.config.scroll, body=self.config.query_body) self.total_size = self.result['hits']['total'] self.total_size = self.config.max_limit if (self.config.max_limit and self.config.max_limit < self.result['hits']['total']) else self.total_size self.curr_size += len(self.result['hits']['hits']) logging.info("Get %d items from %s, percentage: %.2f%%" % (len(self.result['hits']['hits']), self.config.indices + "->" + self.config.doc_type, (self.curr_size / self.total_size * 100) if self.total_size else 0)) origin_length = len(self.result['hits']['hits']) self.total_count += origin_length if self.config.return_source: results = [i["_source"] for i in self.result['hits']['hits']] else: results = self.result if self.config.filter: results = [self.config.filter(i) for i in results] results = [i for i in results if i] self.miss_count += origin_length - len(results) return results if "_scroll_id" in self.result and self.result["_scroll_id"] and self.curr_size < self.total_size: try: self.result = await self.es_client.scroll(scroll_id=self.result["_scroll_id"], scroll=self.config.scroll) except Exception as e: if retry < self.config.max_retry: await asyncio.sleep(random.randint(self.config.random_min_sleep, self.config.random_max_sleep)) return await self.__anext__(retry+1) else: logging.error("Give up es getter, After retry: %d times, still fail to get result: %s, " "total get %d items, total filtered: %d items" % (self.config.max_retry, str(e), self.total_count, self.miss_count)) raise StopAsyncIteration self.curr_size += len(self.result['hits']['hits']) logging.info("Get %d items from %s, percentage: %.2f%%" % (len(self.result['hits']['hits']), self.config.indices + "->" + self.config.doc_type, (self.curr_size / self.total_size * 100) if self.total_size else 0)) origin_length = len(self.result['hits']['hits']) self.total_count += origin_length if self.config.return_source: results = [i["_source"] for i in self.result['hits']['hits']] else: results = self.result if self.config.filter: results = [self.config.filter(i) for i in results] results = [i for i in results if i] self.miss_count += origin_length - len(results) return results logging.info("get source done: %s, total get %d items, total filtered: %d items" % (self.config.indices + "->" + self.config.doc_type, self.total_count, self.miss_count)) self.init_val() raise StopAsyncIteration async def delete_all(self): """ inefficient delete """ body = { "query": { "match_all": {} } } result = await self.config.es_client.delete_by_query(index=self.config.indices, doc_type=self.config.doc_type, body=body, params={"conflicts": "proceed"}) return result def __iter__(self): raise ValueError("ESGetter must be used with async generator, not normal generator") PK<-LmGG7idataapi_transform/DataProcess/DataGetter/JsonGetter.pyimport json import logging from .BaseGetter import BaseGetter class JsonGetter(BaseGetter): def __init__(self, config): super().__init__(self) self.config = config self.responses = list() self.line_num = 0 self.curr_size = 0 self.need_clear = False self.done = False self.f_in = open(self.config.filename, self.config.mode, encoding=self.config.encoding) self.miss_count = 0 self.total_count = 0 def init_val(self): self.responses = list() self.line_num = 0 self.curr_size = 0 self.need_clear = False self.done = False self.f_in.seek(0, 0) self.miss_count = 0 self.total_count = 0 def __aiter__(self): return self async def __anext__(self): if self.need_clear: self.responses.clear() self.need_clear = False if self.done: logging.info("get source done: %s, total get %d items, total filtered: %d items" % (self.config.filename, self.total_count, self.miss_count)) self.init_val() raise StopAsyncIteration for line in self.f_in: self.line_num += 1 try: json_obj = json.loads(line) except json.decoder.JSONDecodeError: logging.error("JSONDecodeError. give up. line: %d" % (self.line_num, )) continue self.total_count += 1 if self.config.filter: json_obj = self.config.filter(json_obj) if not json_obj: self.miss_count += 1 continue self.responses.append(json_obj) self.curr_size += 1 if len(self.responses) > self.config.per_limit: self.need_clear = True return self.responses if self.config.max_limit and self.curr_size > self.config.max_limit: self.need_clear = self.done = True return self.responses self.need_clear = self.done = True if self.responses: return self.responses logging.info("get source done: %s, total get %d items, total filtered: %d items" % (self.config.filename, self.total_count, self.miss_count)) self.init_val() raise StopAsyncIteration def __iter__(self): for line in self.f_in: self.line_num += 1 try: json_obj = json.loads(line) except json.decoder.JSONDecodeError: logging.error("JSONDecodeError. give up. line: %d" % (self.line_num, )) continue self.total_count += 1 if self.config.filter: json_obj = self.config.filter(json_obj) if not json_obj: self.miss_count += 1 continue self.responses.append(json_obj) self.curr_size += 1 if len(self.responses) > self.config.per_limit: yield self.responses self.responses.clear() if self.config.max_limit and self.curr_size > self.config.max_limit: yield self.responses self.responses.clear() break if self.responses: yield self.responses logging.info("get source done: %s, total get %d items, total filtered: %d items" % (self.config.filename, self.total_count, self.miss_count)) self.init_val() def __del__(self): self.f_in.close() PK<-LayJ>>7idataapi_transform/DataProcess/DataGetter/XLSXGetter.pyimport logging from openpyxl import load_workbook from .BaseGetter import BaseGetter class XLSXGetter(BaseGetter): def __init__(self, config): super().__init__() self.config = config self.wb = load_workbook(filename=self.config.filename, read_only=True) if not self.wb.worksheets: raise ValueError("Empty file: %s" % (self.config.filename, )) self.sheet = self.wb.worksheets[self.config.sheet_index] self.headers = self.generate_headers() self.max_row = self.sheet.max_row if self.config.max_limit and self.config.max_limit > self.max_row: self.max_row = self.config.max_limit + 1 # add first headers self.row_num = 0 self.responses = list() self.curr_size = 0 self.need_clear = False self.done = False self.miss_count = 0 self.total_count = 0 def init_val(self): self.row_num = 0 self.responses = list() self.curr_size = 0 self.need_clear = False self.done = False self.miss_count = 0 self.total_count = 0 def __aiter__(self): return self def __anext__(self): if self.need_clear: self.responses.clear() self.need_clear = False if self.done: logging.info("get source done: %s, total get %d items, total filtered: %d items" % (self.config.filename, self.total_count, self.miss_count)) self.init_val() raise StopAsyncIteration while self.row_num < self.max_row: if self.row_num == 0: continue self.row_num += 1 self.total_count += 1 row = self.get_row(self.row_num) if self.config.filter: row = self.config.filter(row) if not row: self.miss_count += 1 continue self.responses.append(row) if len(self.responses) > self.config.per_limit: self.curr_size += len(self.responses) self.need_clear = True return self.responses if self.responses: self.need_clear = self.done = True return self.responses logging.info("get source done: %s, total get %d items, total filtered: %d items" % (self.config.filename, self.total_count, self.miss_count)) self.init_val() raise StopAsyncIteration def generate_headers(self): keys = list() for index in range(self.sheet.max_column): cell_index = index + 1 keys.append(self.sheet._get_cell(1, cell_index).value) return keys def get_row(self, row_num): item = dict() for index in range(len(self.headers)): item[self.headers[index]] = self.sheet._get_cell(row_num, index+1).value return item def __iter__(self): for row_num in range(self.max_row): if row_num == 0: continue row_num += 1 self.total_count += 1 row = self.get_row(row_num) if self.config.filter: row = self.config.filter(row) if not row: self.miss_count += 1 continue self.responses.append(row) if len(self.responses) > self.config.per_limit: self.curr_size += len(self.responses) yield self.responses self.responses.clear() if self.responses: yield self.responses logging.info("get source done: %s, total get %d items, total filtered: %d items" % (self.config.filename, self.total_count, self.miss_count)) self.init_val() def __del__(self): self.wb.close() PK2K5idataapi_transform/DataProcess/DataGetter/__init__.pyPK{KV7idataapi_transform/DataProcess/DataWriter/BaseWriter.pyimport abc from ..Meta.BaseDataProcess import BaseDataProcess class BaseWriter(BaseDataProcess, metaclass=abc.ABCMeta): @abc.abstractmethod def __init__(self, *args, **kwargs): """ :param config """ pass @abc.abstractmethod async def write(self, responses): pass @abc.abstractmethod def __enter__(self): pass @abc.abstractmethod def __exit__(self, exc_type, exc_val, exc_tb): pass PKU+LO O 6idataapi_transform/DataProcess/DataWriter/CSVWriter.pyimport csv import types import logging from .BaseWriter import BaseWriter class CSVWriter(BaseWriter): def __init__(self, config): super().__init__() self.config = config self.f_out = open(self.config.filename, self.config.mode, encoding=self.config.encoding, newline="") self.f_csv = None self.headers = dict() if not self.config.headers else self.config.headers self.total_miss_count = 0 self.success_count = 0 def write(self, responses): miss_count = 0 # filter if self.config.filter: new_result = list() for each_response in responses: each_response = self.config.filter(each_response) if not each_response: miss_count += 1 continue new_result.append(each_response) responses = new_result self.total_miss_count += miss_count # all filtered if not responses: logging.info("%s write 0 item, filtered %d item" % (self.config.filename, miss_count)) return # expand if self.config.expand: responses = [self.expand_dict(i, max_expand=self.config.expand) for i in responses] else: responses = [i for i in responses] if isinstance(responses, types.GeneratorType) else responses # headers if not self.f_csv: if not self.headers: self.headers = self.generate_headers(responses) self.f_csv = csv.DictWriter(self.f_out, self.headers) self.f_csv.writeheader() # encoding process for each_response in responses: for k, v in each_response.items(): if v is None: each_response[k] = "" elif self.config.qsn and v != "" and (isinstance(v, (int, float)) or isinstance(v, str) and all(i.isdigit() for i in v)): each_response[k] = repr(str(v)) elif self.config.encoding not in ("utf8", "utf-8"): each_response[k] = str(v).encode(self.config.encoding, "ignore").decode(self.config.encoding) self.success_count += 1 self.f_csv.writerow(each_response) logging.info("%s write %d item, filtered %d item" % (self.config.filename, len(responses), miss_count)) @staticmethod def generate_headers(responses): headers = set() for r in responses: for key in r.keys(): headers.add(key) return list(headers) def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.f_out.close() logging.info("%s write done, total filtered %d item, total write %d item" % (self.config.filename, self.total_miss_count, self.success_count)) PK/LְԀ  5idataapi_transform/DataProcess/DataWriter/ESWriter.pyimport logging from .BaseWriter import BaseWriter from ..Config.MainConfig import main_config class ESWriter(BaseWriter): def __init__(self, config): if not main_config.has_es_configured: raise ValueError("You must config es_hosts before using ESWriter, Please edit configure file: %s" % (main_config.ini_path, )) super().__init__() self.config = config self.total_miss_count = 0 self.success_count = 0 self.fail_count = 0 async def write(self, responses): origin_length = len(responses) if self.config.filter: responses = [self.config.filter(i) for i in responses] responses = [i for i in responses if i] miss_count = origin_length - len(responses) self.total_miss_count += miss_count if responses: if self.config.expand: responses = [self.expand_dict(i) for i in responses] success, fail, response = await self.config.es_client.add_dict_to_es( self.config.indices, self.config.doc_type, responses, self.config.id_hash_func, self.config.app_code, self.config.actions, self.config.create_date, self.config.error_if_fail, self.config.timeout) if response is not None: self.success_count += success self.fail_count += fail logging.info("Write %d items to index: %s, doc_type: %s, fail: %d, filtered: %d" % ( len(responses), self.config.indices, self.config.doc_type, fail, miss_count)) else: # exception happened logging.info("Write 0 items to index: %s, doc_type: %s" % (self.config.indices, self.config.doc_type)) return response else: # all filtered, or pass empty result logging.info("Write 0 items to index: %s, doc_type: %s" % (self.config.indices, self.config.doc_type)) async def delete_all(self, body=None): """ inefficient delete """ if not body: body = { "query": { "match_all": {} } } result = await self.config.es_client.delete_by_query(index=self.config.indices, doc_type=self.config.doc_type, body=body, params={"conflicts": "proceed"}) return result def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): logging.info("%s->%s write done, total filtered %d item, total write %d item, total fail: %d item" % (self.config.indices, self.config.doc_type, self.total_miss_count, self.success_count, self.fail_count)) PKU+L ??7idataapi_transform/DataProcess/DataWriter/JsonWriter.pyimport json import logging from .BaseWriter import BaseWriter class JsonWriter(BaseWriter): def __init__(self, config): super().__init__() self.config = config self.total_miss_count = 0 self.success_count = 0 self.f_out = open(self.config.filename, self.config.mode, encoding=self.config.encoding) def write(self, responses): miss_count = 0 for each_response in responses: if self.config.expand: each_response = self.expand_dict(each_response, max_expand=self.config.expand) if self.config.filter: each_response = self.config.filter(each_response) if not each_response: miss_count += 1 continue self.f_out.write(json.dumps(each_response) + self.config.new_line) self.success_count += 1 self.total_miss_count += miss_count logging.info("%s write %d item, filtered %d item" % (self.config.filename, len(responses), miss_count)) def __exit__(self, exc_type, exc_val, exc_tb): self.f_out.close() logging.info("%s write done, total filtered %d item, total write %d item" % (self.config.filename, self.total_miss_count, self.success_count)) def __enter__(self): return self PKU+LA\UU6idataapi_transform/DataProcess/DataWriter/TXTWriter.pyimport logging from .BaseWriter import BaseWriter class TXTWriter(BaseWriter): def __init__(self, config): super().__init__() self.config = config self.f_out = open(config.filename, config.mode, encoding=config.encoding) self.total_miss_count = 0 self.success_count = 0 def write(self, responses): miss_count = 0 for each_response in responses: if self.config.expand: each_response = self.expand_dict(each_response, max_expand=self.config.expand) if self.config.filter: each_response = self.config.filter(each_response) if not each_response: miss_count += 1 continue self.f_out.write(self.config.join_val.join(str(value) for value in each_response.values()) + self.config.new_line) self.success_count += 1 self.total_miss_count += miss_count logging.info("%s write %d item, filtered %d item" % (self.config.filename, len(responses), miss_count)) def __exit__(self, exc_type, exc_val, exc_tb): self.f_out.close() logging.info("%s write done, total filtered %d item, total write %d item" % (self.config.filename, self.total_miss_count, self.success_count)) def __enter__(self): return self PKU+L97idataapi_transform/DataProcess/DataWriter/XLSXWriter.pyimport logging from openpyxl import Workbook from .BaseWriter import BaseWriter _warning = False class XLSXWriter(BaseWriter): def __init__(self, config): global _warning super().__init__() self.config = config self.wb = Workbook() self.ws1 = self.wb.active self.ws1.title = config.title self.col_dict = dict() self.row = 2 self.total_miss_count = 0 self.success_count = 0 if not _warning: logging.warning("XLSXWriter will actually write to file when __exit__ of XLSXWriter called") _warning = True def write(self, responses): miss_count = 0 for each_response in responses: if self.config.expand: each_response = self.expand_dict(each_response, max_expand=self.config.expand) if self.config.filter: each_response = self.config.filter(each_response) if not each_response: miss_count += 1 continue for key, value in each_response.items(): if key not in self.col_dict: self.col_dict[key] = len(self.col_dict) + 1 self.ws1.cell(row=1, column=self.col_dict[key], value=key) value = str(value) if value is not None else "" self.ws1.cell(row=self.row, column=self.col_dict[key], value=value) self.row += 1 self.success_count += 1 logging.info("%s write %d item, filtered %d item" % (self.config.filename, len(responses), miss_count)) def __exit__(self, exc_type, exc_val, exc_tb): self.wb.save(filename=self.config.filename) self.wb.close() logging.info("%s write done, total filtered %d item, total write %d item" % (self.config.filename, self.total_miss_count, self.success_count)) def __enter__(self): return self PKK5idataapi_transform/DataProcess/DataWriter/__init__.pyPKKB6idataapi_transform/DataProcess/Meta/BaseDataProcess.py class BaseDataProcess(object): @staticmethod def expand_dict(origin_item, max_expand=0, current_expand=0, parent_key=None, parent_item=None): if max_expand == 0: return origin_item if max_expand != -1 and current_expand >= max_expand: return origin_item if parent_key: if isinstance(origin_item, dict): for sub_k, sub_v in origin_item.items(): parent_item[parent_key + "_" + sub_k] = sub_v if parent_key in parent_item: del parent_item[parent_key] elif isinstance(origin_item, list): for item in origin_item: BaseDataProcess.expand_dict(item, max_expand, current_expand + 1, parent_key, parent_item) return origin_item keys = [k for k in origin_item.keys()] has_sub_dict = False for k in keys: if isinstance(origin_item[k], dict): has_sub_dict = True sub_dict = origin_item[k] for sub_k, sub_v in sub_dict.items(): origin_item[k + "_" + sub_k] = sub_v del origin_item[k] elif isinstance(origin_item[k], list): for item in origin_item[k]: BaseDataProcess.expand_dict(item, max_expand, current_expand + 1, k, origin_item) if has_sub_dict: return BaseDataProcess.expand_dict(origin_item, max_expand, current_expand + 1) else: return origin_item PKgK/idataapi_transform/DataProcess/Meta/__init__.pyPK!H8053idataapi_transform-1.0.2.dist-info/entry_points.txtN+I/N.,()*)J+N/ʵLI,IL,Ȍ Y&fqqPKK11*idataapi_transform-1.0.2.dist-info/LICENSEThe MIT License (MIT) Copyright (c) 2017 zpoint Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. PK!HxQP(idataapi_transform-1.0.2.dist-info/WHEEL1 0 RZq+D-Dv;_[*7Fp 8MRq%_:==ߘPT PK!How"+idataapi_transform-1.0.2.dist-info/METADATAmN0 ~u n"X71~]-5qHҍ$UYR]ϳ'є[WEq%& ]o\}قڟׄzڇVc陓F|S>{vEgnƷcsSNA)Xm0ɞ^_Y҆X,;|ub'tULPK!H\~l)idataapi_transform-1.0.2.dist-info/RECORDɒ}? T1ȴ !yT|k:aQ֍8"eI~ 0l`z87}yA.](Eiý6]PJ%zP6[gW5. BUr\%z(0,g,UVW 0Ǜc0~xl =Y*9ÑuQ9ZXnGQKzZq0vRן/!XNk/$5B001I&-ы SVt=g:VwD#N WXrBVSi`tR2#0"&p*G;,I #mŋjF6]UsdҙLEw`ϳ}bDb!Z i hw]xXc^o!YE{I#>҉["9ԦzwZXז!!d ^ [2{ NtnGԱ _'`& ڂ zDt! HpiYƖ2J^Fq3Q\=YvGG_\S~!qb: b&(w{P?>:hY|b lN v`|PkMmaEɐOc+[;O ͵SRbVDbc,RRw+ oxK3qq$EbeH)-Oq:)"6ď^yb|-xFd Yj VR/ς^rsASV@q  w|^Rd898ma0R ?y|\>at+ eF'+pـJX x{JsGԳ7){䤺h'jr:h>J/-?iczp9Xi`dbDj}1pȘ3'zVtY)Y+ů4\ytC)-4.^))<>o't1p3 Ǧn2ُـkŜ9 I?8'8rJ_4 !0"N7{)'S 'c^>Yql1剑!n3(."G4WG@ zCN0y}nޓz엠mzok!en0v/U@w C8Hi AfG`_J: ?3ұtr2evc1eɶGU8Hcÿ6owbth'<ó[h:u% _ʕ̙4K?e҆n[^w(t>JYgspXq}ugcTPK'L8L3ridataapi_transform/__init__.pyPK4W#L9"GGidataapi_transform/cli.pyPKn-Lr}s> > 0Vidataapi_transform/DataProcess/ProcessFactory.pyPK狗K*(idataapi_transform/DataProcess/__init__.pyPKmUKN]8*)idataapi_transform/DataProcess/Config/ConnectorConfig.pyPKxO$L\ws5A.idataapi_transform/DataProcess/Config/DefaultValue.pyPKms9LjE""11idataapi_transform/DataProcess/Config/ESConfig.pyPK⌞K S2Tidataapi_transform/DataProcess/Config/LogConfig.pyPK5c#Leܒ 3\idataapi_transform/DataProcess/Config/MainConfig.pyPK VK1!gidataapi_transform/DataProcess/Config/__init__.pyPKZK@>pgidataapi_transform/DataProcess/Config/ConfigUtil/BaseConfig.pyPK1LGR;&;&@hidataapi_transform/DataProcess/Config/ConfigUtil/GetterConfig.pyPKu,Lg""@idataapi_transform/DataProcess/Config/ConfigUtil/WriterConfig.pyPK`K<idataapi_transform/DataProcess/Config/ConfigUtil/__init__.pyPK1L<66Yidataapi_transform/DataProcess/DataGetter/APIGetter.pyPKYKUYUU7idataapi_transform/DataProcess/DataGetter/BaseGetter.pyPK<-L&hL L 6Pidataapi_transform/DataProcess/DataGetter/CSVGetter.pyPK8L5idataapi_transform/DataProcess/DataGetter/ESGetter.pyPK<-LmGG7idataapi_transform/DataProcess/DataGetter/JsonGetter.pyPK<-LayJ>>7idataapi_transform/DataProcess/DataGetter/XLSXGetter.pyPK2K5/ idataapi_transform/DataProcess/DataGetter/__init__.pyPK{KV7 idataapi_transform/DataProcess/DataWriter/BaseWriter.pyPKU+LO O 6 idataapi_transform/DataProcess/DataWriter/CSVWriter.pyPK/LְԀ  5Tidataapi_transform/DataProcess/DataWriter/ESWriter.pyPKU+L ??7#idataapi_transform/DataProcess/DataWriter/JsonWriter.pyPKU+LA\UU6=)idataapi_transform/DataProcess/DataWriter/TXTWriter.pyPKU+L97.idataapi_transform/DataProcess/DataWriter/XLSXWriter.pyPKK56idataapi_transform/DataProcess/DataWriter/__init__.pyPKKB647idataapi_transform/DataProcess/Meta/BaseDataProcess.pyPKgK/=idataapi_transform/DataProcess/Meta/__init__.pyPK!H8053=idataapi_transform-1.0.2.dist-info/entry_points.txtPKK11*s>idataapi_transform-1.0.2.dist-info/LICENSEPK!HxQP(Bidataapi_transform-1.0.2.dist-info/WHEELPK!How"+Cidataapi_transform-1.0.2.dist-info/METADATAPK!H\~l)Didataapi_transform-1.0.2.dist-info/RECORDPK##= J