PKP$LT4idataapi_transform/__init__.py"""convert data from a format to another format, read/write from file or database, suitable for iDataAPI""" from .cli import main __version__ = '0.5' PK4W#L9"GGidataapi_transform/cli.pyimport json import asyncio import argparse from .DataProcess.Config.DefaultValue import DefaultVal from .DataProcess.Config.ConfigUtil import GetterConfig from .DataProcess.Config.ConfigUtil import WriterConfig from .DataProcess.ProcessFactory import ProcessFactory class Args(object): from_choices = ["API", "ES", "CSV", "XLSX", "JSON"] from_desc = "argument 'from' can only set to one of 'API', 'ES', 'CSV', 'XLSX', " \ "or 'JSON'(means json line by line file)" to_choices = ["csv", "xlsx", "json", "txt", "es"] to_desc = "argument 'to' can only set to one of \"csv\", \"xlsx\", \"json\", \"txt\" \"es\", \"json\", will write "\ "'json.dumps(item)' line by line. \"txt\" will write each item line by line, each element in each line " \ "is separated by 'space' bu default" source_desc = """ argument 'source', When argument '-from' set to 'ES', source should be 'index:doc_type' When argument 'from' set tp 'API', source should be 'http://... argument 'from' set to others, source should be file path """ dest_desc = "argument 'dest', filename to save result, no need for suffix, " \ "ie '/Desktop/result', default: './result'\n" \ "When argument '-to' set to 'ES', dest should be 'index:doc_type'" per_limit_desc = "amount of data buffered, when buffer filled, Program will write buffered data to 'dest', default 100" max_limit_desc = "write at most 'max_limit' data to 'dest', if 'max_limit' set to 0, means no limit, default to None" retry_desc = "when fetch data failed, retry at most 'retry' time, default 3" r_encoding_desc = "encoding of input file, ignore for xlsx format, default 'utf8'" w_encoding_desc = "encoding of output file, ignore for xlsx format, default 'utf8'" filter_desc = "file contains a 'my_filter(item)' function for filter" param_file_desc = """When you have many item save in id.json, --param_file './id.json::id::pid' means open './id.json ', read each json object line by line, use each_json['id'] as the parameter 'pid' and add it to the tail part of 'source'. --param_file can be either "filename.json::json_param::request_param" or "filename.txt::request_param" """ expand_desc = """If your item is {"a": {"b": "c"}, "b": "d"}, --expand 1 will make your item become {"a_b": "c", "b": "d"}, --expand N means expand at most N level deep of your object, --expand -1 means expand all level -- expand 0 means no expand of your item. Default 0. """ qsn_desc = """quote scientific notation, ie: 4324234234234234123123 will become 4.32423423423423E+021 in normal csv, If quote like '4324234234234234123123', it won't become scientific notation, Only work for output format 'csv' --qsn True means quote scientific notation, --qsn False means not quote scientific notation""" query_body_desc = """ElasticSearch query body, size has same function as "--limit", i.e: body = { "size": 100, "_source": { "includes": ["location", "title", "city", "id"] }, "query": { "bool": { "must": [ { "term": {"appCode": {"value": "ctrip"}} } ] } } } """ write_mode_desc = """'w' or 'a+'""" getter_config_map = { Args.from_choices[0]: GetterConfig.RAPIConfig, Args.from_choices[1]: GetterConfig.RESConfig, Args.from_choices[2]: GetterConfig.RCSVConfig, Args.from_choices[3]: GetterConfig.RXLSXConfig, Args.from_choices[4]: GetterConfig.RJsonConfig } writer_config_map = { Args.to_choices[0]: WriterConfig.WCSVConfig, Args.to_choices[1]: WriterConfig.WXLSXConfig, Args.to_choices[2]: WriterConfig.WJsonConfig, Args.to_choices[3]: WriterConfig.WJsonConfig, Args.to_choices[4]: WriterConfig.WESConfig } def get_arg(): parser = argparse.ArgumentParser(prog="idataapi_transform", description='convert data from a format to another format, ' 'read/write from file or database, suitable for iDataAPI') parser.add_argument("from", choices=Args.from_choices, help=Args.from_desc, type=str.upper) parser.add_argument("to", choices=Args.to_choices, help=Args.to_desc, type=str.lower) parser.add_argument("source", help=Args.source_desc) parser.add_argument("dest", help=Args.dest_desc, default=DefaultVal.dest, nargs="?") parser.add_argument("--per_limit", default=DefaultVal.per_limit, type=int, help=Args.per_limit_desc) parser.add_argument("--max_limit", default=DefaultVal.max_limit, type=int, help=Args.max_limit_desc) parser.add_argument("--max_retry", default=DefaultVal.max_retry, type=int, help=Args.retry_desc) parser.add_argument("--r_encoding", default=DefaultVal.default_encoding, help=Args.r_encoding_desc) parser.add_argument("--w_encoding", default=DefaultVal.default_encoding, help=Args.w_encoding_desc) parser.add_argument("--filter", default=None, help=Args.filter_desc) parser.add_argument("--expand", default=None, type=int, help=Args.expand_desc) parser.add_argument("--qsn", default=None, type=bool, help=Args.qsn_desc) parser.add_argument("--query_body", default=DefaultVal.query_body, type=str, help=Args.query_body_desc) parser.add_argument("--write_mode", default=DefaultVal.default_file_mode_w, type=str, help=Args.write_mode_desc) return parser.parse_args() def get_filter(filter_file): if not filter_file: return None with open(filter_file, "r") as f: exec(f.read()) func = locals()["my_filter"] return func async def getter_to_writer(getter, writer): with writer as safe_writer: async for items in getter: if asyncio.iscoroutinefunction(safe_writer.write): await safe_writer.write(items) else: safe_writer.write(items) def main(): args = get_arg() from_ = getattr(args, "from") from_args = list() from_kwargs = dict() to_args = list() to_kwargs = dict() if from_ != Args.from_choices[0]: # not api from_args.extend(args.source.split(":")) else: from_args.extend([args.source]) from_kwargs["encoding"] = args.r_encoding if args.query_body: try: from_kwargs["query_body"] = json.loads(args.query_body) except Exception as e: raise SyntaxError("--query_body must be json serialized") for key in ("per_limit", "max_limit", "max_retry"): from_kwargs[key] = getattr(args, key) to_kwargs["filter_"] = get_filter(args.filter) to_kwargs["encoding"] = args.w_encoding to_kwargs["mode"] = args.write_mode for key in ("max_retry", "expand", "qsn"): to_kwargs[key] = getattr(args, key) if from_ not in getter_config_map: raise ValueError("argument from must be in %s" % (str(Args.from_choices), )) getter_config = getter_config_map[from_](*from_args, **from_kwargs) getter = ProcessFactory.create_getter(getter_config) if args.to != Args.to_choices[-1]: dest = args.dest + "." + args.to to_args.append(dest) else: indices, doc_type = args.dest.split(":") to_args.append(indices) to_args.append(doc_type) writer_config = writer_config_map[args.to](*to_args, **to_kwargs) writer = ProcessFactory.create_writer(writer_config) loop = asyncio.get_event_loop() loop.run_until_complete(getter_to_writer(getter, writer)) if __name__ == "__main__": main() PKsX#L6U= = 0idataapi_transform/DataProcess/ProcessFactory.py# config from .Config.MainConfig import main_config from .Config.ConfigUtil import GetterConfig from .Config.ConfigUtil import WriterConfig from .DataGetter.ESGetter import ESScrollGetter from .DataGetter.CSVGetter import CSVGetter from .DataGetter.APIGetter import APIGetter, APIBulkGetter from .DataGetter.JsonGetter import JsonGetter from .DataGetter.XLSXGetter import XLSXGetter from .DataWriter.CSVWriter import CSVWriter from .DataWriter.ESWriter import ESWriter from .DataWriter.JsonWriter import JsonWriter from .DataWriter.TXTWriter import TXTWriter from .DataWriter.XLSXWriter import XLSXWriter class ProcessFactory(object): @staticmethod def create_getter(config): """ create a getter based on config :return: getter """ if isinstance(config, GetterConfig.RAPIConfig): return APIGetter(config) elif isinstance(config, GetterConfig.RCSVConfig): return CSVGetter(config) elif isinstance(config, GetterConfig.RESConfig): return ESScrollGetter(config) elif isinstance(config, GetterConfig.RJsonConfig): return JsonGetter(config) elif isinstance(config, GetterConfig.RXLSXConfig): return XLSXGetter(config) elif isinstance(config, GetterConfig.RAPIBulkConfig): return APIBulkGetter(config) else: raise ValueError("create_getter must pass one of the instance of [RAPIConfig, RCSVConfig, " "RESConfig, RJsonConfig, RXLSXConfig, APIBulkGetter]") @staticmethod def create_writer(config): """ create a writer based on config :return: a writer """ if isinstance(config, WriterConfig.WCSVConfig): return CSVWriter(config) elif isinstance(config, WriterConfig.WESConfig): return ESWriter(config) elif isinstance(config, WriterConfig.WJsonConfig): return JsonWriter(config) elif isinstance(config, WriterConfig.WTXTConfig): return TXTWriter(config) elif isinstance(config, WriterConfig.WXLSXConfig): return XLSXWriter(config) else: raise ValueError("create_writer must pass one of the instance of [WCSVConfig, WESConfig, WJsonConfig, " "WTXTConfig, WXLSXConfig]") PK狗K*idataapi_transform/DataProcess/__init__.pyPKmUKN]8idataapi_transform/DataProcess/Config/ConnectorConfig.pyimport aiohttp import asyncio from .MainConfig import main_config main_config = main_config() default_concurrency_limit = main_config["main"].getint("concurrency") class _SessionManger(object): def __init__(self, concurrency_limit=default_concurrency_limit, loop=None): self.session = self._generate_session(concurrency_limit=concurrency_limit, loop=loop) @staticmethod def _generate_connector(limit=default_concurrency_limit, loop=None): """ https://github.com/KeepSafe/aiohttp/issues/883 if connector is passed to session, it is not available anymore """ if not loop: loop = asyncio.get_event_loop() return aiohttp.TCPConnector(limit=limit, loop=loop) @staticmethod def _generate_session(concurrency_limit=default_concurrency_limit, loop=None): if not loop: loop = asyncio.get_event_loop() return aiohttp.ClientSession(connector=_SessionManger._generate_connector(limit=concurrency_limit, loop=loop), loop=loop) def get_session(self): return self.session def __del__(self): self.session.close() session_manger = _SessionManger() PKxO$L\ws5idataapi_transform/DataProcess/Config/DefaultValue.pyimport os from .MainConfig import main_config base_config = main_config() class DefaultVal(object): per_limit = base_config["main"].getint("per_limit") max_limit = base_config["main"].get("max_limit") if max_limit != "None": max_limit = int(max_limit) else: max_limit = None max_retry = base_config["main"].getint("max_retry") random_min_sleep = base_config["main"].getint("random_min_sleep") random_max_sleep = base_config["main"].getint("random_max_sleep") default_file_mode_r = "r" default_file_mode_w = "w" default_encoding = "utf8" new_line = "\n" join_val = " " title = "example" qsn = None query_body = None dest = os.getcwd() + "/result" interval = 5 concurrency = 50 PK#L1idataapi_transform/DataProcess/Config/ESConfig.pyimport asyncio import aiohttp import hashlib import json import logging from elasticsearch_async.connection import AIOHttpConnection as OriginAIOHttpConnection from aiohttp.client_exceptions import ServerFingerprintMismatch from elasticsearch.exceptions import ConnectionError, ConnectionTimeout, SSLError from elasticsearch.compat import urlencode from elasticsearch_async import AsyncElasticsearch es_hosts = None def init_es(hosts, es_headers, timeout_): global es_hosts, AsyncElasticsearch es_hosts = hosts if not es_hosts: return False class AIOHttpConnection(OriginAIOHttpConnection): """ Override default AIOHttpConnection.perform_request to add headers """ @asyncio.coroutine def perform_request(self, method, url, params=None, body=None, timeout=None, ignore=()): url_path = url if params: url_path = '%s?%s' % (url, urlencode(params or {})) url = self.base_url + url_path start = self.loop.time() response = None try: with aiohttp.Timeout(timeout_ or timeout or self.timeout): response = yield from self.session.request(method, url, data=body, headers=es_headers) raw_data = yield from response.text() duration = self.loop.time() - start except Exception as e: self.log_request_fail(method, url, url_path, body, self.loop.time() - start, exception=e) if isinstance(e, ServerFingerprintMismatch): raise SSLError('N/A', str(e), e) if isinstance(e, asyncio.TimeoutError): raise ConnectionTimeout('TIMEOUT', str(e), e) raise ConnectionError('N/A', str(e), e) finally: if response is not None: yield from response.release() # raise errors based on http status codes, let the client handle those if needed if not (200 <= response.status < 300) and response.status not in ignore: self.log_request_fail(method, url, url_path, body, duration, status_code=response.status, response=raw_data) self._raise_error(response.status, raw_data) self.log_request_success(method, url, url_path, body, response.status, raw_data, duration) return response.status, response.headers, raw_data class MyAsyncElasticsearch(AsyncElasticsearch): @staticmethod def default_id_hash_func(item): if "appCode" in item and item["appCode"] and "id" in item and item["id"]: value = (item["appCode"] + item["id"]).encode("utf8") else: value = str(item).encode("utf8") return hashlib.md5(value).hexdigest() async def add_dict_to_es(self, indices, doc_type, items, id_hash_func=None, app_code=None): if not id_hash_func: id_hash_func = self.default_id_hash_func body = "" for item in items: if app_code: item["appCode"] = app_code action = { "index": { "_index": indices, "_type": doc_type, "_id": id_hash_func(item) } } body += json.dumps(action) + "\n" + json.dumps(item) + "\n" try: r = await self.transport.perform_request("POST", "/_bulk?pretty", body=body) return r except Exception as e: logging.error("elasticsearch Exception, give up: %s" % (str(e), )) return None if es_headers: OriginAIOHttpConnection.perform_request = AIOHttpConnection.perform_request AsyncElasticsearch = MyAsyncElasticsearch return True def get_es_client(): return AsyncElasticsearch(hosts=es_hosts) PK⌞K S2idataapi_transform/DataProcess/Config/LogConfig.pyimport os import logging from logging.handlers import RotatingFileHandler format_str = "%(asctime)s %(filename)s[line:%(lineno)d] %(levelname)s %(message)s" date_formatter_str = '[%Y-%m-%d %H:%M:%S]' formatter = logging.Formatter(format_str, datefmt=date_formatter_str) class SingleLevelFilter(logging.Filter): def __init__(self, passlevel, reject): super(SingleLevelFilter, self).__init__() self.passlevel = passlevel self.reject = reject def filter(self, record): if self.reject: return record.levelno != self.passlevel else: return record.levelno == self.passlevel def init_log(log_dir, max_log_file_bytes, ini_path): root_logger = logging.getLogger() root_logger.setLevel(logging.INFO) # console console = logging.StreamHandler() console.setFormatter(formatter) root_logger.addHandler(console) if log_dir: if not os.path.exists(log_dir): logging.error("log_dir(%s) in configure file(%s) not exists, I will not log to file" % (log_dir, ini_path)) return False if not max_log_file_bytes: logging.error("log_byte not set, please configure log_byte in configure file(%s), " "or I will not log to file" % (ini_path, )) return False # info h1 = RotatingFileHandler("%s/info.log" % (log_dir, ), mode="a", maxBytes=max_log_file_bytes, encoding="utf8", backupCount=1) h1.setFormatter(formatter) f1 = SingleLevelFilter(logging.INFO, False) h1.addFilter(f1) root_logger.addHandler(h1) # error h1 = RotatingFileHandler("%s/error.log" % (log_dir, ), mode="a", maxBytes=max_log_file_bytes, encoding="utf8", backupCount=1) h1.setFormatter(formatter) f1 = SingleLevelFilter(logging.ERROR, False) h1.addFilter(f1) root_logger.addHandler(h1) return True return False PK5c#Leܒ 3idataapi_transform/DataProcess/Config/MainConfig.pyimport os import json import configparser from os.path import expanduser from .LogConfig import init_log from .ESConfig import init_es default_configure_content = """ [main] # default max concurrency value for APIGetter concurrency = 50 # buffer size per_limit = 100 # fetch at most max_limit items max_limit = None # max retry for getter before give up if fail to get data max_retry = 3 # sleep interval if fail random_min_sleep = 1 random_max_sleep = 3 [es] # elasticsearch host # hosts = ["localhost:9393"] # elasticsearch headers when perform http request # headers = {"Host": "localhost", "value": "value"} # request timeout, seconds # timeout = 10 [log] # a directory to save log file # path = /Users/zpoint/Desktop/idataapi-transform/logs/ # max byte per log file # log_byte = 5242880 """ class MainConfig(object): def __init__(self, ini_path=None): # singleton if not hasattr(self, "__instance"): self.ini_path = ini_path if not self.ini_path: home = expanduser("~") self.ini_path = home + "/idataapi-transform.ini" if not os.path.exists(self.ini_path): with open(self.ini_path, "w") as f: f.write(default_configure_content) self.__instance = configparser.ConfigParser() self.__instance.read(self.ini_path) MainConfig.__instance = self.__instance self.has_log_file = self.config_log() self.has_es_configured = self.config_es() def __call__(self): return self.__instance def config_log(self): max_log_file_bytes = self.__instance["log"].getint("log_byte") log_path = self.__instance["log"].get("path") return init_log(log_path, max_log_file_bytes, self.ini_path) def config_es(self): hosts = self.__instance["es"].get("hosts") timeout = self.__instance["es"].getint("timeout") if hosts: try: hosts = json.loads(hosts) except Exception as e: raise ValueError("es host must be json serialized") headers = self.__instance["es"].get("headers") if headers and headers != "None": try: headers = json.loads(headers) except Exception as e: raise ValueError("es headers must be json serialized") else: headers = None return init_es(hosts, headers, timeout) main_config = MainConfig() PK VK1idataapi_transform/DataProcess/Config/__init__.pyPKZK@>idataapi_transform/DataProcess/Config/ConfigUtil/BaseConfig.pyimport abc class BaseGetterConfig(object, metaclass=abc.ABCMeta): @abc.abstractmethod def __init__(self, *args, **kwargs): pass class BaseWriterConfig(object, metaclass=abc.ABCMeta): @abc.abstractmethod def __init__(self, *args, **kwargs): pass PK[#L= self.config.max_retry: if self.responses: self.done = self.need_clear = True return self.responses self.retry_count += 1 continue else: if self.retry_count >= self.config.max_retry: logging.error("Unable to get url: %s " % (self.base_url, )) if self.responses: self.done = self.need_clear = True return self.responses self.retry_count += 1 await asyncio.sleep(random.randint(self.config.random_min_sleep, self.config.random_max_sleep)) continue if self.config.max_limit and self.curr_size > self.config.max_limit: self.done = self.need_clear = True return self.responses elif len(self.responses) >= self.config.per_limit: self.need_clear = True return self.responses elif self.done: # buffer has empty data, and done fetching raise StopAsyncIteration def __iter__(self): raise ValueError("APIGetter must be used with async generator, not normal generator") class APIBulkGetter(BaseGetter): def __init__(self, config): super().__init__() self.config = config self.api_configs = self.to_generator(self.config.configs) self.pending_tasks = list() self.buffers = list() self.success_task = 0 self.curr_size = 0 self.need_clear = False @staticmethod def to_generator(items): for i in items: yield i async def fetch_items(self, api_config): async for items in APIGetter(api_config): self.buffers.extend(items) def fill_tasks(self): if len(self.pending_tasks) >= self.config.concurrency: return for api_config in self.api_configs: self.pending_tasks.append(self.fetch_items(api_config)) if len(self.pending_tasks) >= self.config.concurrency: return def __aiter__(self): return self async def __anext__(self): if self.need_clear: self.buffers.clear() self.fill_tasks() while self.pending_tasks: done, pending = await asyncio.wait(self.pending_tasks, timeout=self.config.interval) self.pending_tasks = list(pending) self.success_task += len(done) if self.buffers: self.need_clear = True self.curr_size += len(self.buffers) return self.buffers else: # after interval seconds, no item fetched self.fill_tasks() logging.info("After %.2f seconds, no new item fetched, current success task: %d, pending tasks: %d" % (float(self.config.interval), self.success_task, len(self.pending_tasks))) continue logging.info("APIBulkGetter Done, total perform: %d tasks. fetch: %d items" % (self.success_task, self.curr_size)) raise StopAsyncIteration def __iter__(self): raise ValueError("APIBulkGetter must be used with async generator, not normal generator") PKYKUYUU7idataapi_transform/DataProcess/DataGetter/BaseGetter.pyimport abc from ..Meta.BaseDataProcess import BaseDataProcess class BaseGetter(BaseDataProcess, metaclass=abc.ABCMeta): @abc.abstractmethod def __init__(self, *args, **kwargs): """ :param config config contains attribute: source: where to read data per_limit: return at most per_limit data each time max_limit: return at most max_limit data total """ pass @abc.abstractmethod def __aiter__(self): return self @abc.abstractmethod async def __anext__(self): pass PK9dK |  6idataapi_transform/DataProcess/DataGetter/CSVGetter.pyimport csv import sys import logging from .BaseGetter import BaseGetter if sys.platform == "linux": csv.field_size_limit(sys.maxsize) class CSVGetter(BaseGetter): def __init__(self, config): super().__init__() self.config = config self.f_in = open(self.config.filename, self.config.mode, encoding=self.config.encoding) self.reader = csv.DictReader(self.f_in) self.done = False self.need_clear = False self.curr_size = 0 self.responses = list() def init_val(self): self.done = False self.need_clear = False self.curr_size = 0 self.responses = list() self.f_in.seek(0, 0) def __aiter__(self): return self async def __anext__(self): if self.need_clear: self.responses.clear() self.need_clear = False if self.done: self.init_val() logging.info("get source done: %s" % (self.config.filename, )) raise StopAsyncIteration for row in self.reader: self.responses.append(row) if len(self.responses) > self.config.per_limit: self.curr_size += len(self.responses) self.need_clear = True return self.responses if self.config.max_limit and self.curr_size > self.config.max_limit: self.done = self.need_clear = True return self.responses if self.responses: self.done = self.need_clear = True return self.responses self.init_val() logging.info("get source done: %s" % (self.config.filename,)) raise StopAsyncIteration def __iter__(self): for row in self.reader: self.responses.append(row) if len(self.responses) > self.config.per_limit: self.curr_size += len(self.responses) yield self.responses self.responses.clear() if self.config.max_limit and self.curr_size > self.config.max_limit: yield self.responses self.responses.clear() break if self.responses: yield self.responses self.init_val() logging.info("get source done: %s" % (self.config.filename,)) PK{#L2 2 5idataapi_transform/DataProcess/DataGetter/ESGetter.pyimport asyncio import random import logging from .BaseGetter import BaseGetter from ..Config.MainConfig import main_config class ESScrollGetter(BaseGetter): def __init__(self, config): if not main_config.has_es_configured: raise ValueError("You must config es_hosts before using ESGetter, Please edit configure file: %s" % (main_config.ini_path, )) super().__init__(self) self.config = config self.es_client = config.es_client self.total_size = None self.result = None self.curr_size = 0 def __aiter__(self): return self def init_val(self): self.total_size = None self.result = None self.curr_size = 0 async def __anext__(self, retry=1): if self.total_size is None: self.result = await self.es_client.search(self.config.indices, self.config.doc_type, scroll="1m", body=self.config.query_body) self.total_size = self.result['hits']['total'] self.total_size = self.config.max_limit if (self.config.max_limit and self.config.max_limit < self.result['hits']['total']) else self.total_size self.curr_size += len(self.result['hits']['hits']) logging.info("Get %d items from %s, percentage: %.2f%%" % (len(self.result['hits']['hits']), self.config.indices + "->" + self.config.doc_type, self.curr_size / self.total_size * 100)) return [i["_source"] for i in self.result['hits']['hits']] if self.config.return_source else self.result if "_scroll_id" in self.result and self.result["_scroll_id"] and self.curr_size < self.total_size: try: self.result = await self.es_client.scroll(scroll_id=self.result["_scroll_id"], scroll="1m") except Exception as e: if retry < self.config.max_retry: await asyncio.sleep(random.randint(self.config.random_min_sleep, self.config.random_max_sleep)) return await self.__anext__(retry+1) else: logging.error("Give up es getter, After retry: %d times, still fail to get result: %s" % (self.config.max_retry, str(e))) raise StopAsyncIteration self.curr_size += len(self.result['hits']['hits']) logging.info("Get %d items from %s, percentage: %.2f%%" % (len(self.result['hits']['hits']), self.config.indices + "->" + self.config.doc_type, self.curr_size / self.total_size * 100)) return [i["_source"] for i in self.result['hits']['hits']] if self.config.return_source else self.result self.init_val() logging.info("get source done: %s" % (self.config.indices + "->" + self.config.doc_type, )) raise StopAsyncIteration async def delete_all(self): """ inefficient delete """ body = { "query": { "match_all": {} } } result = await self.config.es_client.delete_by_query(index=self.config.indices, doc_type=self.config.doc_type, body=body) return result def __iter__(self): raise ValueError("ESGetter must be used with async generator, not normal generator") PK9dK)\N 7idataapi_transform/DataProcess/DataGetter/JsonGetter.pyimport json import logging from .BaseGetter import BaseGetter class JsonGetter(BaseGetter): def __init__(self, config): super().__init__(self) self.config = config self.responses = list() self.line_num = 0 self.curr_size = 0 self.need_clear = False self.done = False self.f_in = open(self.config.filename, self.config.mode, encoding=self.config.encoding) def init_val(self): self.responses = list() self.line_num = 0 self.curr_size = 0 self.need_clear = False self.done = False self.f_in.seek(0, 0) def __aiter__(self): return self async def __anext__(self): if self.need_clear: self.responses.clear() self.need_clear = False if self.done: logging.info("get source done: %s" % (self.config.filename,)) self.init_val() raise StopAsyncIteration for line in self.f_in: self.line_num += 1 try: json_obj = json.loads(line) except json.decoder.JSONDecodeError: logging.error("JSONDecodeError. give up. line: %d" % (self.line_num, )) continue self.responses.append(json_obj) self.curr_size += 1 if len(self.responses) > self.config.per_limit: self.need_clear = True return self.responses if self.config.max_limit and self.curr_size > self.config.max_limit: self.need_clear = self.done = True return self.responses self.need_clear = self.done = True if self.responses: return self.responses logging.info("get source done: %s" % (self.config.filename,)) self.init_val() raise StopAsyncIteration def __iter__(self): for line in self.f_in: self.line_num += 1 try: json_obj = json.loads(line) except json.decoder.JSONDecodeError: logging.error("JSONDecodeError. give up. line: %d" % (self.line_num, )) continue self.responses.append(json_obj) self.curr_size += 1 if len(self.responses) > self.config.per_limit: yield self.responses self.responses.clear() if self.config.max_limit and self.curr_size > self.config.max_limit: yield self.responses self.responses.clear() break if self.responses: yield self.responses self.init_val() logging.info("get source done: %s" % (self.config.filename,)) def __del__(self): self.f_in.close() PK1R#Li~ 7idataapi_transform/DataProcess/DataGetter/XLSXGetter.pyimport logging from openpyxl import load_workbook from .BaseGetter import BaseGetter class XLSXGetter(BaseGetter): def __init__(self, config): super().__init__() self.config = config self.wb = load_workbook(filename=self.config.filename, read_only=True) if not self.wb.worksheets: raise ValueError("Empty file: %s" % (self.config.filename, )) self.sheet = self.wb.worksheets[self.config.sheet_index] self.headers = self.generate_headers() self.max_row = self.sheet.max_row if self.config.max_limit and self.config.max_limit > self.max_row: self.max_row = self.config.max_limit + 1 # add first headers self.row_num = 0 self.responses = list() self.curr_size = 0 self.need_clear = False self.done = False def init_val(self): self.row_num = 0 self.responses = list() self.curr_size = 0 self.need_clear = False self.done = False def __aiter__(self): return self def __anext__(self): if self.need_clear: self.responses.clear() self.need_clear = False if self.done: logging.info("get source done: %s" % (self.config.filename,)) self.init_val() raise StopAsyncIteration while self.row_num < self.max_row: if self.row_num == 0: continue self.row_num += 1 row = self.get_row(self.row_num) self.responses.append(row) if len(self.responses) > self.config.per_limit: self.curr_size += len(self.responses) self.need_clear = True return self.responses if self.responses: self.need_clear = self.done = True return self.responses logging.info("get source done: %s" % (self.config.filename,)) self.init_val() raise StopAsyncIteration def generate_headers(self): keys = list() for index in range(self.sheet.max_column): cell_index = index + 1 keys.append(self.sheet._get_cell(1, cell_index).value) return keys def get_row(self, row_num): item = dict() for index in range(len(self.headers)): item[self.headers[index]] = self.sheet._get_cell(row_num, index+1).value return item def __iter__(self): for row_num in range(self.max_row): if row_num == 0: continue row_num += 1 row = self.get_row(row_num) self.responses.append(row) if len(self.responses) > self.config.per_limit: self.curr_size += len(self.responses) yield self.responses self.responses.clear() if self.responses: yield self.responses self.init_val() logging.info("get source done: %s" % (self.config.filename,)) def __del__(self): self.wb.close() PK2K5idataapi_transform/DataProcess/DataGetter/__init__.pyPK{KV7idataapi_transform/DataProcess/DataWriter/BaseWriter.pyimport abc from ..Meta.BaseDataProcess import BaseDataProcess class BaseWriter(BaseDataProcess, metaclass=abc.ABCMeta): @abc.abstractmethod def __init__(self, *args, **kwargs): """ :param config """ pass @abc.abstractmethod async def write(self, responses): pass @abc.abstractmethod def __enter__(self): pass @abc.abstractmethod def __exit__(self, exc_type, exc_val, exc_tb): pass PKKO O 6idataapi_transform/DataProcess/DataWriter/CSVWriter.pyimport csv import types import logging from .BaseWriter import BaseWriter class CSVWriter(BaseWriter): def __init__(self, config): super().__init__() self.config = config self.f_out = open(self.config.filename, self.config.mode, encoding=self.config.encoding, newline="") self.f_csv = None self.headers = dict() if not self.config.headers else self.config.headers self.total_miss_count = 0 self.success_count = 0 def write(self, responses): miss_count = 0 # filter if self.config.filter: new_result = list() for each_response in responses: each_response = self.config.filter(each_response) if not each_response: miss_count += 1 continue new_result.append(each_response) responses = new_result self.total_miss_count += miss_count # all filtered if not responses: logging.info("%s write 0 item, filtered %d item" % (self.config.filename, miss_count)) return # expand if self.config.expand: responses = [self.expand_dict(i, max_expand=self.config.expand) for i in responses] else: responses = [i for i in responses] if isinstance(responses, types.GeneratorType) else responses # headers if not self.f_csv: if not self.headers: self.headers = self.generate_headers(responses) self.f_csv = csv.DictWriter(self.f_out, self.headers) self.f_csv.writeheader() # encoding process for each_response in responses: for k, v in each_response.items(): if v is None: each_response[k] = "" elif self.config.qsn and v != "" and (isinstance(v, (int, float)) or isinstance(v, str) and all(i.isdigit() for i in v)): each_response[k] = repr(str(v)) elif self.config.encoding not in ("utf8", "utf-8"): each_response[k] = str(v).encode(self.config.encoding, "ignore").decode(self.config.encoding) self.success_count += 1 self.f_csv.writerow(each_response) logging.info("%s write %d item, filtered %d item" % (self.config.filename, len(responses), miss_count)) @staticmethod def generate_headers(responses): headers = set() for r in responses: for key in r.keys(): headers.add(key) return list(headers) def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.f_out.close() logging.info("%s write done, total filtered %d item, total write %d item" % (self.config.filename, self.total_miss_count, self.success_count)) PK#Lq--5idataapi_transform/DataProcess/DataWriter/ESWriter.pyimport logging from .BaseWriter import BaseWriter from ..Config.MainConfig import main_config class ESWriter(BaseWriter): def __init__(self, config): if not main_config.has_es_configured: raise ValueError("You must config es_hosts before using ESWriter, Please edit configure file: %s" % (main_config.ini_path, )) super().__init__() self.config = config self.total_miss_count = 0 self.success_count = 0 async def write(self, responses): origin_length = len(responses) if self.config.filter: responses = [self.config.filter(i) for i in responses] responses = [i for i in responses if i] miss_count = origin_length - len(responses) self.total_miss_count += miss_count if responses: if self.config.expand: responses = [self.expand_dict(i) for i in responses] r = await self.config.es_client.add_dict_to_es(self.config.indices, self.config.doc_type, responses, self.config.id_hash_func, self.config.app_code) if r: self.success_count += len(responses) else: r = None logging.info("Write %d items to index: %s, doc_type: %s" % (len(responses) if r else 0, self.config.indices, self.config.doc_type)) return r async def delete_all(self): """ inefficient delete """ body = { "query": { "match_all": {} } } result = await self.config.es_client.delete_by_query(index=self.config.indices, doc_type=self.config.doc_type, body=body) return result def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): logging.info("%s->%s write done, total filtered %d item, total write %d item" % (self.config.indices, self.config.doc_type, self.total_miss_count, self.success_count)) PK |K ??7idataapi_transform/DataProcess/DataWriter/JsonWriter.pyimport json import logging from .BaseWriter import BaseWriter class JsonWriter(BaseWriter): def __init__(self, config): super().__init__() self.config = config self.total_miss_count = 0 self.success_count = 0 self.f_out = open(self.config.filename, self.config.mode, encoding=self.config.encoding) def write(self, responses): miss_count = 0 for each_response in responses: if self.config.expand: each_response = self.expand_dict(each_response, max_expand=self.config.expand) if self.config.filter: each_response = self.config.filter(each_response) if not each_response: miss_count += 1 continue self.f_out.write(json.dumps(each_response) + self.config.new_line) self.success_count += 1 self.total_miss_count += miss_count logging.info("%s write %d item, filtered %d item" % (self.config.filename, len(responses), miss_count)) def __exit__(self, exc_type, exc_val, exc_tb): self.f_out.close() logging.info("%s write done, total filtered %d item, total write %d item" % (self.config.filename, self.total_miss_count, self.success_count)) def __enter__(self): return self PK |KA\UU6idataapi_transform/DataProcess/DataWriter/TXTWriter.pyimport logging from .BaseWriter import BaseWriter class TXTWriter(BaseWriter): def __init__(self, config): super().__init__() self.config = config self.f_out = open(config.filename, config.mode, encoding=config.encoding) self.total_miss_count = 0 self.success_count = 0 def write(self, responses): miss_count = 0 for each_response in responses: if self.config.expand: each_response = self.expand_dict(each_response, max_expand=self.config.expand) if self.config.filter: each_response = self.config.filter(each_response) if not each_response: miss_count += 1 continue self.f_out.write(self.config.join_val.join(str(value) for value in each_response.values()) + self.config.new_line) self.success_count += 1 self.total_miss_count += miss_count logging.info("%s write %d item, filtered %d item" % (self.config.filename, len(responses), miss_count)) def __exit__(self, exc_type, exc_val, exc_tb): self.f_out.close() logging.info("%s write done, total filtered %d item, total write %d item" % (self.config.filename, self.total_miss_count, self.success_count)) def __enter__(self): return self PK|K97idataapi_transform/DataProcess/DataWriter/XLSXWriter.pyimport logging from openpyxl import Workbook from .BaseWriter import BaseWriter _warning = False class XLSXWriter(BaseWriter): def __init__(self, config): global _warning super().__init__() self.config = config self.wb = Workbook() self.ws1 = self.wb.active self.ws1.title = config.title self.col_dict = dict() self.row = 2 self.total_miss_count = 0 self.success_count = 0 if not _warning: logging.warning("XLSXWriter will actually write to file when __exit__ of XLSXWriter called") _warning = True def write(self, responses): miss_count = 0 for each_response in responses: if self.config.expand: each_response = self.expand_dict(each_response, max_expand=self.config.expand) if self.config.filter: each_response = self.config.filter(each_response) if not each_response: miss_count += 1 continue for key, value in each_response.items(): if key not in self.col_dict: self.col_dict[key] = len(self.col_dict) + 1 self.ws1.cell(row=1, column=self.col_dict[key], value=key) value = str(value) if value is not None else "" self.ws1.cell(row=self.row, column=self.col_dict[key], value=value) self.row += 1 self.success_count += 1 logging.info("%s write %d item, filtered %d item" % (self.config.filename, len(responses), miss_count)) def __exit__(self, exc_type, exc_val, exc_tb): self.wb.save(filename=self.config.filename) self.wb.close() logging.info("%s write done, total filtered %d item, total write %d item" % (self.config.filename, self.total_miss_count, self.success_count)) def __enter__(self): return self PKK5idataapi_transform/DataProcess/DataWriter/__init__.pyPKKB6idataapi_transform/DataProcess/Meta/BaseDataProcess.py class BaseDataProcess(object): @staticmethod def expand_dict(origin_item, max_expand=0, current_expand=0, parent_key=None, parent_item=None): if max_expand == 0: return origin_item if max_expand != -1 and current_expand >= max_expand: return origin_item if parent_key: if isinstance(origin_item, dict): for sub_k, sub_v in origin_item.items(): parent_item[parent_key + "_" + sub_k] = sub_v if parent_key in parent_item: del parent_item[parent_key] elif isinstance(origin_item, list): for item in origin_item: BaseDataProcess.expand_dict(item, max_expand, current_expand + 1, parent_key, parent_item) return origin_item keys = [k for k in origin_item.keys()] has_sub_dict = False for k in keys: if isinstance(origin_item[k], dict): has_sub_dict = True sub_dict = origin_item[k] for sub_k, sub_v in sub_dict.items(): origin_item[k + "_" + sub_k] = sub_v del origin_item[k] elif isinstance(origin_item[k], list): for item in origin_item[k]: BaseDataProcess.expand_dict(item, max_expand, current_expand + 1, k, origin_item) if has_sub_dict: return BaseDataProcess.expand_dict(origin_item, max_expand, current_expand + 1) else: return origin_item PKgK/idataapi_transform/DataProcess/Meta/__init__.pyPK!H8051idataapi_transform-0.5.dist-info/entry_points.txtN+I/N.,()*)J+N/ʵLI,IL,Ȍ Y&fqqPKK11(idataapi_transform-0.5.dist-info/LICENSEThe MIT License (MIT) Copyright (c) 2017 zpoint Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. PK!HxQP&idataapi_transform-0.5.dist-info/WHEEL1 0 RZq+D-Dv;_[*7Fp 8MRq%_:==ߘPT PK!HDn!)idataapi_transform-0.5.dist-info/METADATAmN0 ~u n"X71^뮖8$Fl0i*߿ Xag1 I@iph|-N'6YwZbv zhOc FBCDFl8Q[sK zF;Hc ,*ODSjq'kB^eٖCmƥ`MK2>VbQ$s*iq ~0%`'-ؑOW}ţij<=L:NvT "Mfd6b.=+}oPK!H>'b'idataapi_transform-0.5.dist-info/RECORDIH"9"(ʅ`W_?~=F"==*9YYiﷹ7~=$M_y^^}kW`|m${S. {vӫ͉L]IkyʟeP)$̆ʥRD*>,2 n Ú8\qXe]<"{r0+cӯP.sBAlje.~حM<t}}r8B!V6IL&}i$Oߗ:~Vx\֖i kˬ D516+0/lS9Z~9Rf BJCUUsdY^͎ӟB:܌!oxr$q-0'HiB}>QOr-Sm:!S2"E":pCo4w͋#Y(p½ SwAtMDK:  9Sv;8=nG*6S"&6Mj[Zv*ұl-wx]p$h"; R43526v-aEYW1{z; è>5n,k!LdŊlC}|?iA;Z]nee!{Cqg_WHܚP'~C~5!uQg4Ѐ0GX S w}un.Q] yfQRoQ< B-,?<-!<&/:^` `T'?{Y>iN f!9O!YqRfhv-\[a(8I[ț3:cmgգ>x_ U;4?ˑ7tw:uF+@Ԡyƅ6%5e `t;Omg[˜96u~%brfuwb4UX@#伇S7Lr&VG $Rt(Vx#lw=wd@%\ j,J*5ⰯO;roط(F0-~&۸? gqY`VK-%8z4 q"é:oǮ(3uvB)eGi,lSo!C -pglRMC]?q R e,ma^`NI Z,BDtKiCB-vi}6Ns8?PKP$LT4idataapi_transform/__init__.pyPK4W#L9"GGidataapi_transform/cli.pyPKsX#L6U= = 0Qidataapi_transform/DataProcess/ProcessFactory.pyPK狗K*(idataapi_transform/DataProcess/__init__.pyPKmUKN]8$)idataapi_transform/DataProcess/Config/ConnectorConfig.pyPKxO$L\ws5;.idataapi_transform/DataProcess/Config/DefaultValue.pyPK#L11idataapi_transform/DataProcess/Config/ESConfig.pyPK⌞K S2Aidataapi_transform/DataProcess/Config/LogConfig.pyPK5c#Leܒ 3Iidataapi_transform/DataProcess/Config/MainConfig.pyPK VK1Sidataapi_transform/DataProcess/Config/__init__.pyPKZK@>HTidataapi_transform/DataProcess/Config/ConfigUtil/BaseConfig.pyPK[#L'b'4idataapi_transform-0.5.dist-info/RECORDPK##3