PK!s,aioacm/__init__.pyfrom . import client from .client import ACMClient, ACMException, DEFAULTS, DEFAULT_GROUP_NAME __version__ = client.VERSION __all__ = ["ACMClient", "ACMException", "DEFAULTS", "DEFAULT_GROUP_NAME"] PK!+aioacm/client.py# coding: utf8 # Standard Library import hmac import json import time import base64 import asyncio import hashlib import logging from http import HTTPStatus from asyncio import iscoroutinefunction from functools import partial from urllib.error import URLError, HTTPError from urllib.parse import urlencode, unquote_plus # Non Standard Library from aiohttp import ClientError, ClientSession, ClientResponseError # Current Project from .files import read_file, save_file, delete_file from .params import is_valid, group_key, parse_key from .server import get_server_list from .commons import truncate, synchronized_with_attr logger = logging.getLogger("aioacm") DEBUG = False VERSION = "0.3.13" DEFAULT_GROUP_NAME = "DEFAULT_GROUP" DEFAULT_NAMESPACE = "" WORD_SEPARATOR = u'\x02' LINE_SEPARATOR = u'\x01' kms_available = False try: from aliyunsdkcore.client import AcsClient from aliyunsdkkms.request.v20160120.DecryptRequest import DecryptRequest from aliyunsdkkms.request.v20160120.EncryptRequest import EncryptRequest kms_available = True except ImportError: logger.info("Aliyun KMS SDK is not installed") ENCRYPTED_DATA_ID_PREFIX = "cipher-" DEFAULTS = { "APP_NAME": "ACM-SDK-Python", "TIMEOUT": 3, # in seconds "PULLING_TIMEOUT": 30, # in seconds "PULLING_CONFIG_SIZE": 3000, "CALLBACK_THREAD_NUM": 10, "FAILOVER_BASE": "acm-data/data", "SNAPSHOT_BASE": "acm-data/snapshot", "KMS_ENABLED": False, "REGION_ID": "", "KEY_ID": "", } OPTIONS = set(( "default_timeout", "tls_enabled", "auth_enabled", "cai_enabled", "pulling_timeout", "pulling_config_size", "callback_thread_num", "failover_base", "snapshot_base", "app_name", "kms_enabled", "region_id", "kms_ak", "kms_secret", "key_id", "no_snapshot" )) _FUTURES = [] class ACMException(Exception): pass class ACMRequestException(ACMException): pass def process_common_params(data_id, group): if not group or not group.strip(): group = DEFAULT_GROUP_NAME else: group = group.strip() if not data_id or not is_valid(data_id): raise ACMException("Invalid dataId.") if not is_valid(group): raise ACMException("Invalid group.") return data_id, group def parse_pulling_result(result): if not result: return list() ret = list() for i in unquote_plus(result).split(LINE_SEPARATOR): if not i.strip(): continue sp = i.split(WORD_SEPARATOR) if len(sp) < 3: sp.append("") ret.append(sp) return ret def is_encrypted(data_id): return data_id.startswith(ENCRYPTED_DATA_ID_PREFIX) class WatcherWrap: def __init__(self, key, callback): self.callback = callback self.last_md5 = None self.watch_key = key class CacheData: def __init__(self, key, client): self.key = key local_value = read_file(client.failover_base, key) or \ read_file(client.snapshot_base, key) self.content = local_value if isinstance(local_value, bytes): src = local_value.decode("utf8") else: src = local_value self.md5 = hashlib.md5(src.encode("GBK")).hexdigest() if src else None self.is_init = True if not self.md5: logger.getChild('init-cache').debug( "cache for %s does not have local value", key ) class ACMClient: """Client for ACM available API: * get * add_watcher * remove_watcher """ debug = False @staticmethod def set_debugging(): if not ACMClient.debug: handler = logging.StreamHandler() handler.setFormatter( logging.Formatter( "%(asctime)s %(levelname)s %(name)s:%(message)s" ) ) logger.addHandler(handler) logger.setLevel(logging.DEBUG) ACMClient.debug = True def __init__(self, endpoint, namespace=None, ak=None, sk=None): self.endpoint = endpoint self.namespace = namespace or DEFAULT_NAMESPACE or "" self.ak = ak self.sk = sk self.server_list = None self.server_list_lock = asyncio.Lock() self.current_server = None self.server_offset = 0 self.server_refresh_running = False self.watcher_mapping = dict() self.pulling_lock = asyncio.Lock() self.puller_mapping = None self.notify_queue = None self.callback_tread_pool = None self.process_mgr = None self.default_timeout = DEFAULTS["TIMEOUT"] self.tls_enabled = False self.auth_enabled = self.ak and self.sk self.cai_enabled = True self.pulling_timeout = DEFAULTS["PULLING_TIMEOUT"] self.pulling_config_size = DEFAULTS["PULLING_CONFIG_SIZE"] self.callback_tread_num = DEFAULTS["CALLBACK_THREAD_NUM"] self.failover_base = DEFAULTS["FAILOVER_BASE"] self.snapshot_base = DEFAULTS["SNAPSHOT_BASE"] self.app_name = DEFAULTS["APP_NAME"] self.kms_enabled = DEFAULTS["KMS_ENABLED"] self.region_id = DEFAULTS["REGION_ID"] self.key_id = DEFAULTS["KEY_ID"] self.kms_ak = self.ak self.kms_secret = self.sk self.kms_client = None self.no_snapshot = False logger.getChild('client-init').info( "endpoint:%s, tenant:%s", endpoint, namespace ) def set_options(self, **kwargs): logger = logger.getChild('set_options') for k, v in kwargs.items(): if k not in OPTIONS: logger.warning("unknown option:%s, ignored" % k) continue if k == "kms_enabled" and v and not kms_available: logger.warning("kms can not be turned on with no KMS SDK installed") continue logger.debug("key:%s, value:%s" % (k, v)) setattr(self, k, v) async def _refresh_server_list(self): logger = logger.getChild('refresh-server') async with self.server_list_lock: if self.server_refresh_running: logger.warning("task is running, aborting") return self.server_refresh_running = True while True: try: await asyncio.sleep(30) logger.debug("try to refresh server list") server_list = await get_server_list( self.endpoint, 443 if self.tls_enabled else 8080, self.cai_enabled ) logger.debug( "server_num:%s server_list:%s", len(server_list), server_list ) if not server_list: logger.error( "empty server_list get from %s, " "do not refresh", self.endpoint ) continue async with self.server_list_lock: self.server_list = server_list self.server_offset = 0 if self.current_server not in server_list: logger.warning( "%s is not effective, change one", str(self.current_server) ) self.current_server = server_list[self.server_offset] except Exception as e: logger.error("exception %s occur", exc_info=e) async def change_server(self): async with self.server_list_lock: self.server_offset = ( (self.server_offset + 1) % len(self.server_list) ) self.current_server = self.server_list[self.server_offset] async def get_server(self): logger = logger.getChild('get-server') if self.server_list is None: async with self.server_list_lock: logger.info( "server list is null, try to initialize" ) server_list = await get_server_list( self.endpoint, 443 if self.tls_enabled else 8080, self.cai_enabled ) if not server_list: logger.error( "empty server_list get from %s", self.endpoint ) return None self.server_list = server_list self.current_server = self.server_list[self.server_offset] logger.info( "server_num:%s server_list:%s", len(self.server_list), self.server_list ) if self.cai_enabled: future = asyncio.ensure_future( self._refresh_server_list() ) future.add_done_callback(partial(self.log_and_rerun_on_failure, self._refresh_server_list)) # close job than run in backgroud. _FUTURES.append(future) logger.info("use server:%s" % str(self.current_server)) return self.current_server async def remove(self, data_id, group, timeout=None): """ Remove one data item from ACM. :param data_id: dataId. :param group: group, use "DEFAULT_GROUP" if no group specified. :param timeout: timeout for requesting server in seconds. :return: True if success or an exception will be raised. """ logger = logger.getChild("remove") data_id, group = process_common_params(data_id, group) logger.info( "data_id:%s, group:%s, namespace:%s, timeout:%s" % (data_id, group, self.namespace, timeout)) params = { "dataId": data_id, "group": group, } if self.namespace: params["tenant"] = self.namespace try: resp = await self._do_sync_req("/diamond-server/datum.do?method=deleteAllDatums", None, None, params, 'POST', timeout or self.default_timeout) logger.info("success to remove group:%s, data_id:%s, server response:%s" % ( group, data_id, resp)) return True except ClientResponseError as e: if e.code == HTTPStatus.FORBIDDEN: logger.error( "no right for namespace:%s, group:%s, data_id:%s" % (self.namespace, group, data_id)) raise ACMException("Insufficient privilege.") else: logger.error("error code [:%s] for namespace:%s, group:%s, data_id:%s" % ( e.code, self.namespace, group, data_id)) raise ACMException("Request Error, code is %s" % e.code) except Exception as e: logger.exception("exception %s occur" % str(e)) raise # cache_key = group_key(data_id, group, self.namespace) # delete_file(self.snapshot_base, cache_key) # return True async def publish(self, data_id, group, content, timeout=None): """ Publish one data item to ACM. If the data key is not exist, create one first. If the data key is exist, update to the content specified. Content can not be set to None, if there is need to delete config item, use function **remove** instead. :param data_id: dataId. :param group: group, use "DEFAULT_GROUP" if no group specified. :param content: content of the data item. :param timeout: timeout for requesting server in seconds. :return: True if success or an exception will be raised. """ logger = logger.getChild('publish') if content is None: raise ACMException("Can not publish none content, use remove instead.") data_id, group = process_common_params(data_id, group) if isinstance(content, bytes): content = content.decode("utf8") if is_encrypted(data_id) and self.kms_enabled: content = self.encrypt(content) logger.info("data_id:%s, group:%s, namespace:%s, content:%s, timeout:%s" % ( data_id, group, self.namespace, truncate(content), timeout)) params = { "dataId": data_id, "group": group, "content": content.encode("GBK"), } if self.namespace: params["tenant"] = self.namespace try: resp = await self._do_sync_req("/diamond-server/basestone.do?method=syncUpdateAll", None, None, params, 'POST', timeout or self.default_timeout) logger.info("success to publish content, group:%s, data_id:%s, server response:%s" % ( group, data_id, resp)) return True except ClientResponseError as e: if e.code == HTTPStatus.FORBIDDEN: logger.error( "no right for namespace:%s, group:%s, data_id:%s" % (self.namespace, group, data_id)) raise ACMException("Insufficient privilege.") else: logger.error("error code [:%s] for namespace:%s, group:%s, data_id:%s" % ( e.code, self.namespace, group, data_id)) raise ACMException("Request Error, code is %s" % e.code) except Exception as e: logger.exception("exception %s occur" % str(e)) raise async def get(self, data_id, group, timeout=None, no_snapshot=False): content = await self.get_raw(data_id, group, timeout, no_snapshot) if content and is_encrypted(data_id) and self.kms_enabled: return self.decrypt(content) return content async def get_raw(self, data_id, group, timeout=None, no_snapshot=False): """Get value of one config item. Query priority: 1. Get from local failover dir(default: "{cwd}/acm/data"). Failover dir can be manually copied from snapshot dir(default: "{cwd}/acm/snapshot") in advance. This helps to suppress the effect of known server failure. 2. Get from one server until value is got or all servers tried. Content will be save to snapshot dir. 3. Get from snapshot dir. :param data_id: dataId. :param group: group, use "DEFAULT_GROUP" if no group specified. :param timeout: timeout for requesting server in seconds. :param no_snapshot: do not save snapshot. :return: value. """ logger = logger.getChild('get-config') no_snapshot = self.no_snapshot if no_snapshot is None else no_snapshot data_id, group = process_common_params(data_id, group) logger.info( "data_id:%s, group:%s, namespace:%s, timeout:%s", data_id, group, self.namespace, timeout ) params = { "dataId": data_id, "group": group, } if self.namespace: params["tenant"] = self.namespace cache_key = group_key(data_id, group, self.namespace) # get from failover content = read_file(self.failover_base, cache_key) if content is None: logger.debug( "failover config is not exist for %s, " "try to get from server", cache_key ) else: logger.debug( "get %s from failover directory, content is %s", cache_key, truncate(content) ) return content # get from server try: content = await self._do_sync_req( "/diamond-server/config.co", None, params, None, 'GET', timeout or self.default_timeout ) except ClientResponseError as e: if e.code == HTTPStatus.NOT_FOUND: logger.warning( "config not found for data_id:%s, group:%s, " "namespace:%s, try to delete snapshot", data_id, group, self.namespace ) delete_file(self.snapshot_base, cache_key) return None elif e.code == HTTPStatus.CONFLICT: logger.error( "config being modified concurrently for " "data_id:%s, group:%s, namespace:%s", data_id, group, self.namespace ) elif e.code == HTTPStatus.FORBIDDEN: logger.error( "no right for data_id:%s, group:%s, " "namespace:%s", data_id, group, self.namespace ) raise ACMException("Insufficient privilege.") else: logger.error( "error code [:%s] for data_id:%s, group:%s, " "namespace:%s", e.code, data_id, group, self.namespace ) if no_snapshot: raise except ACMException as e: logger.error("acm exception: %s" % str(e)) except Exception as e: logger.exception("exception %s occur" % str(e)) if no_snapshot: raise if no_snapshot: return content if content is not None: logger.info( "content from server:%s, data_id:%s, group:%s, " "namespace:%s, try to save snapshot", truncate(content), data_id, group, self.namespace ) try: save_file(self.snapshot_base, cache_key, content) except Exception as e: logger.exception( "save snapshot failed for %s, data_id:%s, " "group:%s, namespace:%s", data_id, group, self.namespace, str(e) ) return content logger.error( "get config from server failed, try snapshot, " "data_id:%s, group:%s, namespace:%s", data_id, group, self.namespace ) content = read_file(self.snapshot_base, cache_key) if content is None: logger.warning( "snapshot is not exist for %s.", cache_key ) else: logger.debug( "get %s from snapshot directory, content is %s", cache_key, truncate(content) ) return content async def list(self, page=1, size=200): """ Get config items of current namespace with content included. Data is directly from acm server. :param page: which page to query, starts from 1. :param size: page size. :return: """ logger = logger.getChild("list") logger.info("try to list namespace:%s" % self.namespace) params = { "pageNo": page, "pageSize": size, "method": "getAllConfigByTenant", } if self.namespace: params["tenant"] = self.namespace try: d = await self._do_sync_req("/diamond-server/basestone.do", None, params, None, 'GET', self.default_timeout) if isinstance(d, bytes): d = d.decode("utf8") return json.loads(d) except ClientResponseError as e: if e.code == HTTPStatus.FORBIDDEN: logger.error("no right for namespace:%s" % self.namespace) raise ACMException("Insufficient privilege.") else: logger.error("[list] error code [%s] for namespace:%s" % (e.code, self.namespace)) raise ACMException("Request Error, code is %s" % e.code) except Exception as e: logger.exception("exception %s occur" % str(e)) raise async def list_all(self, group=None, prefix=None): """ Get all config items of current namespace, with content included. Warning: If there are lots of config in namespace, this function may cost some time. :param group: only dataIds with group match shall be returned. :param prefix: only dataIds startswith prefix shall be returned **it's case sensitive**. :return: """ logger = .logger.getChild("list-all") logger.info("namespace:%s, group:%s, prefix:%s" % (self.namespace, group, prefix)) def matching(ori): return (group is None or ori["group"] == group) and (prefix is None or ori["dataId"].startswith(prefix)) result = await self.list(1, 200) if not result: logger.warning("can not get config items of %s" % self.namespace) return list() ret_list = [{"dataId": i["dataId"], "group": i["group"]} for i in result["pageItems"] if matching(i)] pages = result["pagesAvailable"] logger.debug("%s items got from acm server" % result["totalCount"]) for i in range(2, pages + 1): result = await self.list(i, 200) ret_list += [{"dataId": j["dataId"], "group": j["group"]} for j in result["pageItems"] if matching(j)] logger.debug("%s items returned" % len(ret_list)) return ret_list @synchronized_with_attr("pulling_lock") def add_watcher(self, data_id, group, cb): self.add_watchers(data_id, group, [cb]) @synchronized_with_attr("pulling_lock") def add_watchers(self, data_id, group, cb_list): """Add watchers to specified item. 1. Callback is invoked from current process concurrently by thread pool. 2. Callback is invoked at once if the item exists. 3. Callback is invoked if changes or deletion detected on the item. :param data_id: dataId. :param group: group, use "DEFAULT_GROUP" if no group specified. :param cb_list: callback functions. :return: """ logger = logger.getChild("add-watcher") if not cb_list: raise ACMException("A callback function is needed.") data_id, group = process_common_params(data_id, group) logger.info( "data_id:%s, group:%s, namespace:%s", data_id, group, self.namespace ) cache_key = group_key(data_id, group, self.namespace) wl = self.watcher_mapping.get(cache_key) if not wl: wl = list() self.watcher_mapping[cache_key] = wl for cb in cb_list: wl.append(WatcherWrap(cache_key, cb)) if hasattr(cb, '__name__'): cb_name = cb.__name__ elif hasattr(cb, 'func'): cb_name = cb.func.__name__ else: cb_name = str(cb) logger.info( "watcher has been added for key:%s, " "new callback is:%s, callback number is:%s", cache_key, cb_name, len(wl) ) if self.puller_mapping is None: logger.debug("pulling should be initialized") self._int_pulling() def callback(): if cache_key in self.puller_mapping: logger.debug( "key:%s is already in pulling", cache_key ) return for key, puller_info in self.puller_mapping.items(): if len(puller_info[1]) < self.pulling_config_size: logger.debug( "puller:%s is available, add key:%s", puller_info[0], cache_key ) puller_info[1].append(cache_key) self.puller_mapping[cache_key] = puller_info break else: logger.debug( "no puller available, " "new one and add key:%s", cache_key ) # NOTE: Is this `key_list` correct? key_list = [] key_list.append(cache_key) puller = asyncio.ensure_future( self._do_pulling(key_list, self.notify_queue) ) self.puller_mapping[cache_key] = (puller, key_list) puller.add_done_callback( partial( self.log_and_update_puller_on_failure, self._do_pulling, key_list, self.notify_queue, cache_key=cache_key ) ) asyncio.get_event_loop().call_soon(callback) def log_and_update_puller_on_failure(self, coro, *args, **kwargs): logger = logger.getChild("callback") future = args[-1] exc = future.exception() if exc: logger.error('Exception happened on future', exc_info=exc) args = args[:-1] cache_key = kwargs.pop('cache_key') new_future = asyncio.ensure_future(coro(*args, **kwargs)) self.puller_mapping[cache_key] = (new_future, self.puller_mapping[cache_key][1]) new_future.add_done_callback( partial( self.log_and_rerun_on_failure, coro, *args, **kwargs, cache_key=cache_key) ) @synchronized_with_attr("pulling_lock") def remove_watcher(self, data_id, group, cb, remove_all=False): """Remove watcher from specified key. :param data_id: data_id. :param group: group, use "DEFAULT_GROUP" if no group specified. :param cb: callback function. :param remove_all: weather to remove all occurrence of the callback or just once. :return: """ logger = logger.getChild("remove-watcher") if not cb: raise ACMException("A callback function is needed.") data_id, group = process_common_params(data_id, group) if not self.puller_mapping: logger.warning("[remove-watcher] watcher is never started.") return cache_key = group_key(data_id, group, self.namespace) wl = self.watcher_mapping.get(cache_key) if not wl: logger.warning( "[remove-watcher] there is no watcher on key:%s", cache_key ) return wrap_to_remove = list() for i in wl: if i.callback == cb: wrap_to_remove.append(i) if not remove_all: break for i in wrap_to_remove: wl.remove(i) if hasattr(cb, '__name__'): cb_name = cb.__name__ elif hasattr(cb, 'func'): cb_name = cb.func.__name__ else: cb_name = str(cb) logger.info( "[remove-watcher] %s is removed from %s, remove all:%s", cb_name, cache_key, remove_all ) if not wl: logger.debug( "[remove-watcher] there is no watcher for:%s, " "kick out from pulling", cache_key ) self.watcher_mapping.pop(cache_key) puller_info = self.puller_mapping[cache_key] puller_info[1].remove(cache_key) if not puller_info[1]: logger.debug( "[remove-watcher] there is no pulling keys for puller:%s, " "stop it", puller_info[0] ) self.puller_mapping.pop(cache_key) puller_info[0].cancel() async def _do_sync_req(self, url: str, headers: dict = None, params: dict = None, data: str = None, method: str = 'get', timeout: int = None): logger = logger.getChild("do-sync-req") # url = "?".join([url, urlencode(params)]) if params else url all_headers = self._get_common_headers(params, data) if headers: all_headers.update(headers) logger.debug( "url:%s, headers:%s, params:%s, data:%s, timeout:%s", url, all_headers, params, data, timeout ) tries = 0 while True: try: server_info = await self.get_server() if not server_info: logger.error("can not get one server.") raise ACMException("Server is not available.") address, port, is_ip_address = server_info server = ":".join([address, str(port)]) # if tls is enabled and server address is in ip, # turn off verification. server_url = "%s://%s%s" % ( "https" if self.tls_enabled else "http", server, url ) async with ClientSession() as request: if method.upper() == 'POST': if data and not isinstance(data, bytes): data = urlencode(data, encoding='GBK').encode() request_ctx = request.post( server_url, headers=all_headers, params=params, data=data, timeout=timeout ) else: request_ctx = request.get( server_url, headers=all_headers, params=params, timeout=timeout ) async with request_ctx as resp: resp.raise_for_status() text = await resp.text() if resp.status > 300: raise HTTPError(server_url, resp.status, resp.reason, all_headers, None) logger.debug( "info from server:%s", server ) return text except HTTPError as e: if e.code in [ HTTPStatus.INTERNAL_SERVER_ERROR, HTTPStatus.BAD_GATEWAY, HTTPStatus.SERVICE_UNAVAILABLE ]: logger.warning( "server:%s is not available for " "reason:%s", server, e.msg ) else: raise except asyncio.TimeoutError: logger.warning("%s request timeout", server) except ClientError as exc: logger.warning( "%s request error. %s", server, exc ) except URLError as e: logger.warning( "%s connection error:%s", server, e.reason ) tries += 1 if tries >= len(self.server_list): logger.error( "%s maybe down, no server is currently " "available", server ) raise ACMRequestException("All server are not available") await self.change_server() logger.warning("%s maybe down, skip to next", server) async def _do_pulling(self, cache_list: list, queue: asyncio.Queue): logger = logger.getChild("do-pulling") cache_pool = dict() for cache_key in cache_list: cache_pool[cache_key] = CacheData(cache_key, self) while cache_list: unused_keys = set(cache_pool.keys()) contains_init_key = False probe_update_string = "" for cache_key in cache_list: cache_data = cache_pool.get(cache_key) if not cache_data: logger.debug("new key added: %s" % cache_key) cache_data = CacheData(cache_key, self) cache_pool[cache_key] = cache_data if cache_data.is_init: contains_init_key = True data_id, group, namespace = parse_key(cache_key) probe_update_string += WORD_SEPARATOR.join([ data_id, group, cache_data.md5 or "", self.namespace ]) probe_update_string += LINE_SEPARATOR try: unused_keys.remove(cache_key) except KeyError: pass for k in unused_keys: logger.debug( "%s is no longer watched, remove from cache", k ) cache_pool.pop(k) logger.debug( "try to detected change from server probe " "string is %s", truncate(probe_update_string) ) headers = { "longPullingTimeout": str(int(self.pulling_timeout * 1000)) } if contains_init_key: headers["longPullingNoHangUp"] = "true" data = {"Probe-Modify-Request": probe_update_string} changed_keys = list() try: resp = await self._do_sync_req( "/diamond-server/config.co", headers, None, data, 'POST', self.pulling_timeout + 10 ) changed_keys = [ group_key(*i) for i in parse_pulling_result(resp) ] logger.debug( "following keys are changed from server %s", truncate(str(changed_keys)) ) except ACMException as e: logger.exception("acm exception: %s" % str(e)) except Exception as e: logger.error( "exception %s occur, return empty list", str(e) ) for cache_key, cache_data in cache_pool.items(): cache_data.is_init = False if cache_key in changed_keys: data_id, group, namespace = parse_key(cache_key) content = await self.get(data_id, group) if content is not None: md5 = hashlib.md5(content.encode("GBK")).hexdigest() else: md5 = None cache_data.md5 = md5 cache_data.content = content await queue.put( (cache_key, cache_data.content, cache_data.md5) ) @synchronized_with_attr("pulling_lock") def _int_pulling(self): logger = logger.getChild("init-pulling") if self.puller_mapping is not None: logger.info("puller is already initialized") return self.puller_mapping = dict() self.notify_queue = asyncio.Queue() future = asyncio.ensure_future(self._process_polling_result()) future.add_done_callback(partial(self.log_and_rerun_on_failure, self._process_polling_result)) logger.info("init completed") async def _process_polling_result(self): logger = logger.getChild("process-polling-result") while True: cache_key, content, md5 = await self.notify_queue.get() logger.debug( "receive an event:%s", cache_key ) wl = self.watcher_mapping.get(cache_key) if not wl: logger.warning( "no watcher on %s, ignored", cache_key ) continue data_id, group, namespace = parse_key(cache_key) params = { "data_id": data_id, "group": group, "namespace": namespace, "content": content } for watcher in wl: if not watcher.last_md5 == md5: cb = watcher.callback if hasattr(cb, '__name__'): cb_name = cb.__name__ elif hasattr(cb, 'func'): cb_name = cb.func.__name__ else: cb_name = str(cb) logger.debug( "md5 changed since last " "call, calling %s", cb_name ) try: if iscoroutinefunction(watcher.callback): await watcher.callback(params) else: watcher.callback(params) except Exception as e: logger.exception( "exception %s occur " "while calling %s ", str(e), cb_name ) watcher.last_md5 = md5 def _get_common_headers(self, params, data): headers = { "Diamond-Client-AppName": self.app_name, "Client-Version": VERSION, "exConfigInfo": "true", } if data: headers["Content-Type"] = "application/x-www-form-urlencoded; charset=GBK" if self.auth_enabled: ts = str(int(time.time() * 1000)) headers.update({ "Spas-AccessKey": self.ak, "timeStamp": ts, }) sign_str = "" # in case tenant or group is null if not params and not data: return headers tenant = (params and params.get("tenant")) or (data and data.get("tenant")) group = (params and params.get("group")) or (data and data.get("group")) if tenant: sign_str = tenant + "+" if group: sign_str = sign_str + group + "+" if sign_str: sign_str += ts headers["Spas-Signature"] = ( base64.encodebytes( hmac.new( self.sk.encode(), sign_str.encode(), digestmod=hashlib.sha1 ) .digest() ) .decode() .strip() ) return headers def _prepare_kms(self): if not (self.region_id and self.kms_ak and self.kms_secret): return False if not self.kms_client: self.kms_client = AcsClient(ak=self.kms_ak, secret=self.kms_secret, region_id=self.region_id) return True def encrypt(self, plain_txt): if not self._prepare_kms(): return plain_txt ssl._create_default_https_context = ssl._create_unverified_context req = EncryptRequest() req.set_KeyId(self.key_id) req.set_Plaintext(plain_txt if type(plain_txt) == bytes else plain_txt.encode("utf8")) resp = json.loads(self.kms_client.do_action_with_exception(req).decode("utf8")) return resp["CiphertextBlob"] def decrypt(self, cipher_blob): if not self._prepare_kms(): return cipher_blob ssl._create_default_https_context = ssl._create_unverified_context req = DecryptRequest() req.set_CiphertextBlob(cipher_blob) resp = json.loads(self.kms_client.do_action_with_exception(req).decode("utf8")) return resp["Plaintext"] def log_and_rerun_on_failure(self, coro, *args, **kwargs): logger = logger.getChild('callback') future = args[-1] exc = future.exception() if exc: logger.error('Exception happened on future', exc_info=exc) args = args[:-1] new_future = asyncio.ensure_future(coro(*args, **kwargs)) new_future.add_done_callback( partial( self.log_and_rerun_on_failure, coro, *args, **kwargs) ) if DEBUG: ACMClient.set_debugging() PK!aaioacm/command.py# coding: utf8 import os.path import sys import json import fcntl import shutil import asyncio import gettext import zipfile import argparse from datetime import datetime from aioacm import ACMClient, DEFAULT_GROUP_NAME # override the default expression of "positional arguments" def translate_patch(msg): return "required arguments" if msg == "positional arguments" else msg gettext.gettext = translate_patch ### DEFAULT_ENDPOINT = "acm.aliyun.com" DEFAULT_PORT = 8080 CMD = set(["bind", "set", "use"]) CONF = os.path.join(os.getenv("HOME"), ".acm.json") INIT_CONF = { "endpoints": { DEFAULT_ENDPOINT: { "tls": False, "is_current": True, "region_id": None, "kms_enabled": False, "namespaces": { "[default]": { "is_current": True, "ak": None, "sk": None, "alias": "[default]", "kms_ak": None, "kms_secret": None, "key_id": None, "updated": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), } } } } } def _colored(txt, color="green"): cm = { "green": 32, "red": 31, "yellow": 33, "grey": 30, } return "\033[1;%sm%s\033[0m" % (cm[color], txt) def read_config(): try: if not os.path.exists(CONF): write_config(INIT_CONF) if sys.version_info[0] == 3: with open(CONF, "r+", newline="") as f: fcntl.flock(f, fcntl.LOCK_EX) return json.loads(f.read()) else: with open(CONF, "r+") as f: fcntl.flock(f, fcntl.LOCK_EX) return json.loads(f.read()) except Exception as e: print("Read config error due to %s" % str(e)) sys.exit(1) def write_config(config): try: content = json.dumps(config, indent=4) with open(CONF, "wb") as f: fcntl.flock(f, fcntl.LOCK_EX) f.write(content if type(content) == bytes else content.encode("utf8")) except Exception as e: print("Save config error due to %s" % str(e)) sys.exit(1) def _get_current(config): endpoint = "" namespace = "" for k, v in config["endpoints"].items(): if not v["is_current"]: continue endpoint = k for k2, v2 in v["namespaces"].items(): if not v2["is_current"]: continue namespace = k2 break return endpoint, namespace def _set_current(config, endpoint, namespace=None): for k, v in config["endpoints"].items(): if k == endpoint: v["is_current"] = True if namespace is not None: for k2, v2 in v["namespaces"].items(): if k2 == namespace: v2["is_current"] = True else: v2["is_current"] = False else: v["is_current"] = False return config async def add(args): if ":" in args.namespace: pos = args.namespace.index(":") e = args.namespace[:pos] ns = args.namespace[pos + 1:] else: e = DEFAULT_ENDPOINT ns = args.namespace config = read_config() alias = args.alias or ns if args.alias is not None and ":" in args.alias: print('":" is invalid symbol in alias.') sys.exit(1) # detect alias, ensure unique globally for ep, ep_info in config["endpoints"].items(): for k, v in ep_info["namespaces"].items(): if args.alias is None and v["alias"] == alias and (k != ns or ep != e): alias = "-".join([e, ns]) elif v["alias"] == alias and k != ns: print("Alias %s has been taken by %s:%s, choose another one." % (_colored(alias, "red"), ep, k)) sys.exit(1) # new endpoint if e not in config["endpoints"]: if args.kms: if not args.region_id: print(_colored("Region ID", "red") + " must be specified to use KMS.") sys.exit(1) config["endpoints"][e] = { "tls": args.tls, "is_current": False, "region_id": args.region_id, "kms_enabled": args.kms, "namespaces": {} } print( "Adding a new endpoint: %s, using TLS is %s.\n" % (_colored(e, "yellow"), _colored(args.tls, "yellow"))) else: endpoint = config["endpoints"][e] if args.kms and not args.region_id and not endpoint.get("region_id"): print(_colored("Region ID", "red") + " must be specified to use KMS.") sys.exit(1) if endpoint.get("tls") != args.tls: endpoint["tls"] = args.tls print("TLS attr of %s has changed to %s.\n" % (_colored(e, "yellow"), _colored(args.tls, "yellow"))) if endpoint.get("kms_enabled") != args.kms: endpoint["kms_enabled"] = args.kms print("KMS enabled of %s has turned to %s.\n" % (_colored(e, "yellow"), _colored(args.kms, "yellow"))) if args.region_id is not None: if endpoint.get("region_id") != args.region_id: endpoint["region_id"] = args.region_id print("Region ID of %s has changed to %s.\n" % ( _colored(e, "yellow"), _colored(args.region_id, "yellow"))) if ns in config["endpoints"][e]["namespaces"]: namespace = config["endpoints"][e]["namespaces"][ns] if args.ak is not None: namespace["ak"] = args.ak if args.sk is not None: namespace["sk"] = args.sk if args.alias is not None: namespace["alias"] = alias if args.kms_ak is not None: namespace["kms_ak"] = args.kms_ak if args.kms_secret is not None: namespace["kms_secret"] = args.kms_secret if args.key_id is not None: namespace["key_id"] = args.key_id if args.kms: if not namespace.get("kms_ak"): if not namespace.get("ak"): print(_colored("AccessKey", "red") + ' must be specified to use KMS.') sys.exit(1) namespace["kms_ak"] = namespace.get("ak") if not namespace.get("kms_secret"): if not namespace.get("sk"): print(_colored("SecretKey", "red") + ' must be specified to use KMS.') sys.exit(1) namespace["kms_secret"] = namespace.get("sk") namespace["updated"] = datetime.now().strftime("%Y-%m-%d %H:%M:%S") print("Namespace %s is already exist in %s, updating configs.\n" % ( _colored(ns, "green"), _colored(e, "yellow"))) else: namespace = {"ak": args.ak, "sk": args.sk, "alias": alias, "is_current": False, "updated": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "kms_ak": None, "kms_secret": None, "key_id": None} if args.kms: kms_ak = args.kms_ak or args.ak if not kms_ak: print(_colored("AccessKey", "red") + ' must be specified to use KMS.') sys.exit(1) kms_secret = args.kms_secret or args.sk if not kms_secret: print(_colored("SecretKey", "red") + ' must be specified to use KMS.') sys.exit(1) namespace["kms_ak"] = kms_ak namespace["kms_secret"] = kms_secret namespace["key_id"] = args.key_id config["endpoints"][e]["namespaces"][ns] = namespace print( "Add new namespace %s(%s) to %s.\n" % (_colored(ns, "green"), _colored(alias, "green"), _colored(e, "yellow"))) write_config(config) try: print("Try to access the namespace...") c = ACMClient(endpoint=e, namespace=(None if ns == "[default]" else ns), ak=config["endpoints"][e]["namespaces"][ns]["ak"], sk=config["endpoints"][e]["namespaces"][ns]["sk"]) if config["endpoints"][e]["tls"]: c.set_options(tls_enabled=True) await c.list(1, 1) print("Namespace access succeed.") except: print(_colored("\nWarning: Access test failed, there may be mistakes in configuration.\n", "grey")) def use(args): config = read_config() if ":" in args.namespace: pos = args.namespace.index(":") e = args.namespace[:pos] ns = args.namespace[pos + 1:] else: e = None ns = None found = False # detect alias, ensure unique globally for ep, ep_info in config["endpoints"].items(): for k, v in ep_info["namespaces"].items(): if v["alias"] == args.namespace or (k == ns and ep == e): _set_current(config, ep, k) print("Namespace changed to %s alias:%s.\n" % ( _colored("%s:%s" % (ep, k), "green"), _colored(v["alias"], "green"))) write_config(config) found = True break if found: break else: print("No namespace named or aliased as %s, please check.\n" % _colored(args.namespace, "red")) def _process_namespace(args): config = read_config() if args.namespace is not None: if ":" in args.namespace: pos = args.namespace.index(":") e = args.namespace[:pos] ns = args.namespace[pos + 1:] else: e = None ns = None for ep, ep_info in config["endpoints"].items(): for k, v in ep_info["namespaces"].items(): if v["alias"] == args.namespace or (k == ns and ep == e): return ep, ep_info, k, v print("No namespace named or aliased as %s, please check.\n" % _colored(args.namespace, "red")) sys.exit(1) e, n = _get_current(config) return e, config["endpoints"][e], n, config["endpoints"][e]["namespaces"][n] async def list_conf(args): e, ep, n, ns = _process_namespace(args) c = ACMClient(endpoint=e, namespace=(None if n == "[default]" else n), ak=ns["ak"], sk=ns["sk"]) if ep["tls"]: c.set_options(tls_enabled=True) try: configs = await c.list_all(args.group, args.prefix) except: print("List failed.") sys.exit(1) for i in sorted(configs, key=lambda x: x["group"] + x["dataId"]): print("%(group)s/%(dataId)s" % i) def _write_file(file, content): try: with open(file, "wb") as f: f.write(content if type(content) == bytes else content.encode("utf8")) except Exception as e: print("Write file error due to %s" % str(e)) sys.exit(1) def _read_file(file): try: if sys.version_info[0] == 3: with open(file, "r+", newline="") as f: return f.read() else: with open(file, "r+") as f: return f.read() except Exception as e: print("Read file error due to %s" % str(e)) sys.exit(1) await def pull(args): e, ep, n, ns = _process_namespace(args) c = _get_client(e, ep, n, ns) try: if "/" in args.data_id: g, d = args.data_id.split("/") content = await c.get(d, g, no_snapshot=True) else: content = await c.get(args.data_id, None, no_snapshot=True) except: print("Pull %s failed." % args.data_id) sys.exit(1) if content is None: print("%s does not exist." % _colored(args.data_id, "red")) sys.exit(1) os.write(1, content.encode("utf8")) async def push(args): if args.file: if not sys.stdin.isatty(): print(_colored("Warning: content from stdin will be ignored since file is specified.", "grey")) if not os.path.exists(args.file): print("File %s does not exist." % _colored(args.file, "red")) sys.exit(1) content = _read_file(args.file) elif not sys.stdin.isatty(): content = sys.stdin.read() else: print("Use file or stdin as input.") sys.exit(1) e, ep, n, ns = _process_namespace(args) c = _get_client(e, ep, n, ns) if args.data_id.count("/") > 1: print("Invalid dataId or filename, more than one / is given.") sys.exit(1) group, data_id = args.data_id.split("/") if "/" in args.data_id else (None, args.data_id) try: await c.publish(data_id, group, content) except: import traceback traceback.print_exc() print("Push %s failed." % args.data_id) sys.exit(1) print("content has been pushed to [dataId:%s].\n" % (_colored(args.data_id, "green"))) def current(args): config = read_config() e, n = _get_current(config) print("Current Endpoint:\t%s, using TLS is %s, using KMS is %s, Region ID is %s." % ( _colored(e, "yellow"), _colored(config["endpoints"][e].get("tls"), "yellow"), _colored(config["endpoints"][e].get("kms_enabled"), "yellow"), _colored(config["endpoints"][e].get("region_id"), "yellow"))) print("\nCurrent Namespace:\t%s" % (_colored(n))) ns = config["endpoints"][e]["namespaces"][n] print("\tAlias:\t\t%s" % _colored(ns["alias"], "green")) print("\tAccessKey:\t%s" % ns["ak"]) print("\tSecretKey:\t%s" % ns["sk"]) if config["endpoints"][e].get("kms_enabled"): print("\tAccessKey for KMS:\t%s" % ns.get("kms_ak")) print("\tSecretKey for KMS:\t%s" % ns.get("kms_secret")) print("\tKey ID:\t%s" % ns.get("key_id")) print("") def show(args): config = read_config() e, n = _get_current(config) max_ep = 10 max_ns = 10 max_alias = 10 table_header = ["", "ENDPOINT", "NAMESPACE_ID", "ALIAS", "UPDATED"] table_data = list() for k, v in config["endpoints"].items(): if len(k) > max_ep: max_ep = len(k) start = True for k2, v2 in v["namespaces"].items(): if k == e and k2 == n: row_data = ["*"] else: row_data = [""] if start: row_data.append(k) start = False else: row_data.append(k) if len(k2) > max_ns: max_ns = len(k2) if len(v2["alias"]) > max_alias: max_alias = len(v2["alias"]) row_data.append(k2) row_data.append(v2["alias"]) row_data.append(v2.get("updated", "None")) table_data.append(row_data) table_data = sorted(table_data, key=lambda x: x[4], reverse=True) ptn = "%%-3s%%-%is%%-%is%%-%is%%-20s" % (max_ep + 5, max_ns + 5, max_alias + 5) print(ptn % tuple(table_header)) print("-" * (max_ep + max_ns + max_alias + 38)) for row in table_data: print(ptn % tuple(row)) print("-" * (max_ep + max_ns + max_alias + 38)) print("") async def export(args): e, ep, n, ns = _process_namespace(args) c = _get_client(e, ep, n, ns) try: configs = await c.list_all() except: print("Get config list failed.") sys.exit(1) groups = set() elements = set() for i in configs: groups.add(i["group"]) elements.add(os.path.join(i["group"], i["dataId"]) if i["group"] != DEFAULT_GROUP_NAME else i["dataId"]) dest_file = args.file or "%s-%s.zip" % (e, n) zip_file = None if args.dir: try: os.makedirs(args.dir) except OSError: pass # process deleting if args.delete: candidates = list() # get candidates for root, dirs, files in os.walk(args.dir): if not os.path.basename(root).startswith("."): for i in dirs: if i.startswith("."): continue if i not in groups: candidates.append(os.path.join(root, i)) for i in files: if i.startswith("."): continue candidates.append(os.path.join(root, i)) # kick out elements delete_list = list() trunc_len = len(args.dir) + len(os.path.sep) for i in candidates: if i[trunc_len:] not in elements: delete_list.append(i) # deleting if delete_list: print("Following files and dirs are not exist in ACM Server:\n") for i in delete_list: print(" - " + i) delete = True if not args.force: while True: if sys.version_info[0] == 3: choice = input("\nDeleting all files above? (y/n)") else: choice = raw_input("\nDeleting all files above? (y/n)") if choice.lower() in ["y", "n"]: delete = choice.lower() == "y" break print("Invalid choice, please input y or n.") if delete: for i in delete_list: try: if os.path.isfile(i): os.remove(i) else: shutil.rmtree(i) except OSError: pass print("Delete complete, continue to export...\n") else: zip_file = zipfile.ZipFile(dest_file, 'w', zipfile.ZIP_DEFLATED) print(_colored(len(configs), "green") + " dataIds on ACM server will be exported to %s.\n" % _colored( args.dir or dest_file, "yellow")) i = 0 for config in configs: rel_path = config["group"] if config["group"] != DEFAULT_GROUP_NAME else "" if args.dir: try: os.makedirs(os.path.join(args.dir, rel_path)) except OSError: pass i += 1 sys.stdout.write("\033[K\rExporting: %s/%s %s:%s" % (i, len(configs), config["group"], config["dataId"])) sys.stdout.flush() try: content = await c.get(config["dataId"], config["group"], no_snapshot=True) except: print("Get content of %s:%s failed." % (config["group"] or DEFAULT_GROUP_NAME, config["dataId"])) sys.exit(1) if args.dir: _write_file(os.path.join(args.dir, rel_path, config["dataId"]), content) else: zip_file.writestr(os.path.join(rel_path, config["dataId"]), content.encode("utf8")) if zip_file: zip_file.close() print("") print("All dataIds exported.\n") async def import_to_server(args): e, ep, n, ns = _process_namespace(args) c = _get_client(e, ep, n, ns) if args.dir and not os.path.isdir(args.dir): print("%s does not exist." % _colored(args.dir, "red")) sys.exit(1) src_file = args.file or args.file or "%s-%s.zip" % (e, n) zip_file = None if not args.dir and not os.path.isfile(src_file): print("%s does not exist." % _colored(src_file, "red")) sys.exit(1) data_to_import = list() if args.dir: for f in os.listdir(args.dir): if f.startswith("."): continue if os.path.isfile(os.path.join(args.dir, f)): data_to_import.append((f, DEFAULT_GROUP_NAME)) else: for ff in os.listdir(os.path.join(args.dir, f)): if not ff.startswith(".") and os.path.isfile(os.path.join(args.dir, f, ff)): data_to_import.append((ff, f)) else: zip_file = zipfile.ZipFile(src_file, 'r', zipfile.ZIP_DEFLATED) for info in zip_file.infolist(): sp = info.filename.split(os.path.sep) if len(sp) == 1: data_to_import.append((sp[0], DEFAULT_GROUP_NAME)) elif len(sp) == 2 and sp[1]: data_to_import.append((sp[1], sp[0])) else: print("ignoring invalid path: %s" % info.filename) # process deleting if args.delete: # pick up candidates delete_list = list() configs = await c.list_all() for i in configs: if (i["dataId"], i["group"]) not in data_to_import: delete_list.append(i) # deleting if delete_list: print("Following dataIds are not exist in %s:\n" % _colored(args.dir or src_file, "yellow")) for i in delete_list: print(" - %s:%s" % (i["group"], i["dataId"])) delete = True if not args.force: while True: if sys.version_info[0] == 3: choice = input("\nDeleting all dataIds above in ACM server? (y/n)") else: choice = raw_input("\nDeleting all dataIds above in ACM server? (y/n)") if choice.lower() in ["y", "n"]: delete = choice.lower() == "y" break print("Invalid choice, please input y or n.") if delete: for i in delete_list: await c.remove(i["dataId"], i["group"]) print("Delete complete, continue to import...\n") print(_colored(len(data_to_import), "green") + " files will be imported to ACM server.\n") i = 0 for data in data_to_import: i += 1 sys.stdout.write("\033[K\rImporting: %s/%s %s:%s" % (i, len(data_to_import), data[1], data[0])) sys.stdout.flush() if args.dir: f = os.path.join(args.dir, data[1], data[0]) if data[1] != DEFAULT_GROUP_NAME else os.path.join(args.dir, data[0]) content = _read_file(f) else: name = os.path.join(data[1], data[0]) if data[1] != DEFAULT_GROUP_NAME else data[0] content = zip_file.read(name) try: await c.publish(data[0], data[1], content) except: print("Publish %s/%s failed." % (data[1], data[0])) sys.exit(1) if zip_file: zip_file.close() print("") print("All files imported.\n") def _get_client(e, ep, n, ns): c = ACMClient(endpoint=e, namespace=(None if n == "[default]" else n), ak=ns["ak"], sk=ns["sk"]) if ep.get("kms_enabled"): c.set_options(kms_enabled=True, kms_ak=ns.get("kms_ak"), kms_secret=ns.get("kms_secret"), region_id=ep.get("region_id"), key_id=ns.get("key_id")) if ep["tls"]: c.set_options(tls_enabled=True) return c def arg_parse(): parser = argparse.ArgumentParser(prog="acm", description="ACM command line tools for querying and exporting data.", ) subparsers = parser.add_subparsers(help='sub-command help', title="Sub commands") # add parser_add = subparsers.add_parser("add", help="add a namespace", description='Add a namespace, ' 'update if namespace is already exist.', epilog="Example: acm add acm.aliyun.com:ea61357b-d417-460c-92e4-032677dd8153 " "-s 'GLff***xcao=' -a 654b43******e9750 -n foo") parser_add.add_argument("namespace", default=None, help='use "endpoint:namespace_id" to locate a namespace, ' 'if endpoint is missing, "acm.aliyun.com" act as default.') parser_add.add_argument("-a", dest="ak", default=None, help='AccessKey of this namespace.') parser_add.add_argument("-s", dest="sk", help='SecretKey of this namespace.') parser_add.add_argument("-n", dest="alias", help='alias of the namespace, ":" is not allowed in alias.') parser_add.add_argument("--tls", action="store_true", default=False, help="to use TLS connection.") parser_add.add_argument("--kms", action="store_true", default=False, help="to use Key Management Service (KMS).") parser_add.add_argument("-ka", dest="kms_ak", default=None, help='AccessKey for KMS, use AccessKey by default, required if KMS is enabled.') parser_add.add_argument("-ks", dest="kms_secret", default=None, help='SecretKey for KMS, use SecretKey by default, required if KMS is enabled.') parser_add.add_argument("-k", dest="key_id", default=None, help='Key ID of KMS, required if KMS is enabled.') parser_add.add_argument("-r", dest="region_id", default=None, help='Region ID of Alibaba Cloud, required if KMS is enabled.') parser_add.set_defaults(func=add) # use parser_use = subparsers.add_parser("use", help="switch to a namespace", description="Switch to a namespace.", epilog="Example: acm use acm.aliyun.com:ea61357b-d417-460c-92e4-032677dd8153") parser_use.add_argument("namespace", help='"endpoint:namespace_id" or alias to use.') parser_use.set_defaults(func=use) # current parser_current = subparsers.add_parser("current", help="show current namespace", description="Show current namespace.") parser_current.set_defaults(func=current) # show parser_show = subparsers.add_parser("show", help="show all namespaces", description="Show all namespaces.") parser_show.set_defaults(func=show) # list parser_list = subparsers.add_parser("list", help="get list of dataIds", description="Get list of dataIds.") parser_list.add_argument("-g", dest="group", default=None, help='group of the dataId.') parser_list.add_argument("-p", dest="prefix", default=None, help='prefix of dataId.') parser_list.add_argument("-n", dest="namespace", default=None, help='"endpoint:namespace_id" or alias.') parser_list.set_defaults(func=list_conf) # pull parser_pull = subparsers.add_parser("pull", help="get one config content", description="Get one config content from ACM server.", epilog="Example: acm pull group/dataId > dest.txt") parser_pull.add_argument("data_id", help='the dataId to pull from, use group"/"dataId to specify group.') parser_pull.add_argument("-n", dest="namespace", default=None, help='"endpoint:namespace_id" or alias.') parser_pull.set_defaults(func=pull) # push parser_push = subparsers.add_parser("push", help="push one config", description="Push one config with the content of a local file or stdin.", epilog="Example: cat source.txt | acm push group/dataId") parser_push.add_argument("data_id", help='the dataId to store the content, use group"/"dataId to specify group.') parser_push.add_argument("-f", dest="file", default=None, help='the file to push, stdin can not be empty ' 'if file is not specified.') parser_push.add_argument("-n", dest="namespace", default=None, help='"endpoint:namespace_id" or alias.') parser_push.set_defaults(func=push) # export parser_export = subparsers.add_parser("export", help="export dataIds to local", description="Export dataIds of specified namespace to local dir or zip file.") parser_export.add_argument("-f", dest="file", default=None, help='zip file name, ' 'use "endpoint-namepspace_id.zip" as default.') parser_export.add_argument("-d", dest="dir", default=None, help='export destination dir, file is ignored ' 'if dir is specified.') parser_export.add_argument("-n", dest="namespace", default=None, help='"endpoint:namespace_id" or alias.') parser_export.add_argument("--delete", action="store_true", default=False, help="[only for dir mode] " "delete the file not exist in ACM server (hidden files startswith . are igonred).") parser_export.add_argument("--force", action="store_true", default=False, help="[only for dir mode] " "run and delete silently.") parser_export.set_defaults(func=export) # import parser_import = subparsers.add_parser("import", help="import local dir or zip file to ACM server", description="Import local dir or zip file to ACM server.") parser_import.add_argument("-f", dest="file", default=None, help='zip file name, ' 'use "endpoint-namepspace_id.zip" as default.') parser_import.add_argument("-d", dest="dir", default=None, help='import source dir, file is ignored ' 'if dir is specified.') parser_import.add_argument("-n", dest="namespace", default=None, help='"endpoint:namespace_id" or alias.') parser_import.add_argument("--delete", action="store_true", default=False, help="delete the dataId not exist locally.") parser_import.add_argument("--force", action="store_true", default=False, help="run and delete silently.") parser_import.set_defaults(func=import_to_server) return parser.parse_args() def main(): """ Usage: acm [params] Global Commands: acm add {endpoint}:{namespace_id} [-a {ak} -s {sk}] [-n {alias}] # Add or update an namespace, not mandatory. acm use {endpoint}:{namespace}/{alias} # Choose a namespace for following commands (can be overwritten by -n). acm current # Print current endpoint and acm show # Print all endpoint and namespace info. Namespace Commands(use blank namespace if not specified): acm list [-g {group}] [-p {prefix}] # Get all dataIds matching group or prefix. acm pull {dataId} # Get a config content, default group is DEFAULT_GROUP. acm push {dataId} [-f {file}] # Push one file or content from stdin to ACM acm export [-d {dir}] [-f {zip_file}] [--delete] [--force] # Export dataIds as files. --delete: If local file or directory can not match dataIds ACM, delete it. --force: Overwrite or delete files without asking. acm import [-d {dir}] [-f {zip_file}] [--delete] [--force] # Import files to ACM. --delete: If dataId or group can not match local files, delete it. --force: Overwrite or delete dataIds without asking. Examples: Configurations: |--------------|----------|------------------| | Namespace | group | dataId | +--------------+----------+------------------+ | dev| ACM| jdbc.properties| +--------------+----------+------------------+ | dev| ACM| application.yml| +--------------+----------+------------------+ | dev| VipServer| nginx.conf| +--------------+----------+------------------+ | product| ACM| jdbc.properties| +--------------+----------+------------------+ | product| ACM| application.yml| +--------------+----------+------------------+ | product| VipServer| nginx.conf| +--------------+----------+------------------+ Get started: acm add acm.aliyun.com:'60*****0d38' -a '654b4*****50' -s 'GLf****ao=' -n dev acm add acm.aliyun.com:'7u*****d9eb' -a '37ab8*****2b' -s 'XMV****xc=' -n product acm use dev Find all configurations: acm list -g ACM Get from ACM: acm pull ACM/jdbc.properties >> ACM/jdbc.properties Modify it, then publish to ACM: cat ACM/jdbc.properties | acm push ACM/jdbc.properties Clone all configs from ACM: acm export -d dev_configs Local directories: dev_configs | |-- ACM | |--jdbc.properties | |--application.yml | |-- VipServer |--nginx.conf Transfer configs to an other namespace to make an product release acm import -d dev_configs -n product Tricks Use crontab and local VCS (git,svn) to make synchronization of configurations: acm use dev acm import -d {local_repo} --delete --force """ args = arg_parse() try: asyncio.get_event_loop().run_until_complete(args.func(args)) except AttributeError: print("No sub command is specified, use -h for help.") if __name__ == "__main__": main() PK!raioacm/commons.py# coding: utf8 import asyncio from asyncio import iscoroutine def synchronized_with_attr(attr_name): def decorator(func): async def locked_func(*args, **kws): self = args[0] lock = getattr(self, attr_name) async with lock: result = func(*args, **kws) if iscoroutine(result): return await result return result def synced_func(*args, **kw): return asyncio.ensure_future(locked_func(*args, **kw)) return synced_func return decorator def truncate(ori_str, length=100): if not ori_str: return "" return ori_str[:length] + "..." if len(ori_str) > length else ori_str PK!0,P%%aioacm/files.pyimport sys import fcntl import os.path import logging logger = logging.getLogger("aioacm") def read_file(base, key): file_path = os.path.join(base, key) if not os.path.exists(file_path): return None try: if sys.version_info[0] == 3: with open(file_path, "r+", encoding="utf8", newline="") as f: fcntl.flock(f, fcntl.LOCK_EX) return f.read() else: with open(file_path, "r+") as f: fcntl.flock(f, fcntl.LOCK_EX) return f.read() except OSError: logger.exception( "[read-file] read file failed, file path:%s", file_path ) return None def save_file(base, key, content): file_path = os.path.join(base, key) if not os.path.isdir(base): try: os.makedirs(base) except OSError: logger.warning("[save-file] dir %s is already exist" % base) try: with open(file_path, "wb") as f: fcntl.flock(f, fcntl.LOCK_EX) f.write( content if type(content) == bytes else content.encode("utf8") ) except OSError: logger.error( "[save-file] save file failed, file path:%s", file_path ) def delete_file(base, key): file_path = os.path.join(base, key) try: os.remove(file_path) except OSError: logger.warning( "[delete-file] file not exists, file path:%s", file_path ) PK!̼/ppaioacm/params.pyVALID_CHAR = set(['_', '-', '.', ':']) PARAM_KEYS = ["data_id", "group"] DEFAULT_GROUP_NAME = "DEFAULT_GROUP" def is_valid(param): if not param: return False for i in param: if i.isalpha() or i.isdigit() or i in VALID_CHAR: continue return False return True def check_params(params): for p in PARAM_KEYS: if p in params and not is_valid(params[p]): return False return True def group_key(data_id, group, namespace): return "+".join([data_id, group, namespace]) def parse_key(key): sp = key.split("+") return sp[0], sp[1], sp[2] PK!7{& & aioacm/server.py# coding: utf8 import random import socket import logging from asyncio import TimeoutError from aiohttp import ClientSession, ClientError logger = logging.getLogger("aioacm") ADDRESS_URL_PTN = "http://%s/diamond-server/diamond" ADDRESS_SERVER_TIMEOUT = 3 # in seconds def is_ipv4_address(address): try: socket.inet_aton(address) except socket.error: return False return True async def get_server_list(endpoint: str, default_port: int = 8080, cai_enabled: bool = True) -> list: logger = logger.getChild("get-server-list") server_list = list() if not cai_enabled: logger.info( "cai server is not used, regard endpoint:%s " "as server.", endpoint ) content = endpoint if ':' not in endpoint: content = ':'.join([endpoint, str(default_port)]) else: try: async with ClientSession() as request: async with request.get(ADDRESS_URL_PTN % endpoint, timeout=ADDRESS_SERVER_TIMEOUT) as resp: content = await resp.text() logger.debug("content from endpoint:%s", content) except ClientError as e: logger.error( "get server from %s failed.", endpoint, exc_info=e ) return server_list except TimeoutError: logger.error( "Timeout(%s) when get server from %s.", ADDRESS_SERVER_TIMEOUT, ADDRESS_URL_PTN % endpoint ) return server_list if content: for server_info in content.strip().split("\n"): sp = server_info.strip().split(":") if len(sp) == 1: server_list.append( (sp[0], default_port, is_ipv4_address(sp[0])) ) else: try: server_list.append( (sp[0], int(sp[1]), is_ipv4_address(sp[0])) ) except ValueError: logger.warning( "bad server address:%s ignored", server_info ) random.shuffle(server_list) return server_list PK!B[,[,*aioacm_sdk_python-0.3.13.dist-info/LICENSE Apache License Version 2.0, January 2004 http://www.apache.org/licenses/ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION 1. Definitions. "License" shall mean the terms and conditions for use, reproduction, and distribution as defined by Sections 1 through 9 of this document. "Licensor" shall mean the copyright owner or entity authorized by the copyright owner that is granting the License. "Legal Entity" shall mean the union of the acting entity and all other entities that control, are controlled by, or are under common control with that entity. For the purposes of this definition, "control" means (i) the power, direct or indirect, to cause the direction or management of such entity, whether by contract or otherwise, or (ii) ownership of fifty percent (50%) or more of the outstanding shares, or (iii) beneficial ownership of such entity. "You" (or "Your") shall mean an individual or Legal Entity exercising permissions granted by this License. "Source" form shall mean the preferred form for making modifications, including but not limited to software source code, documentation source, and configuration files. "Object" form shall mean any form resulting from mechanical transformation or translation of a Source form, including but not limited to compiled object code, generated documentation, and conversions to other media types. "Work" shall mean the work of authorship, whether in Source or Object form, made available under the License, as indicated by a copyright notice that is included in or attached to the work (an example is provided in the Appendix below). "Derivative Works" shall mean any work, whether in Source or Object form, that is based on (or derived from) the Work and for which the editorial revisions, annotations, elaborations, or other modifications represent, as a whole, an original work of authorship. For the purposes of this License, Derivative Works shall not include works that remain separable from, or merely link (or bind by name) to the interfaces of, the Work and Derivative Works thereof. "Contribution" shall mean any work of authorship, including the original version of the Work and any modifications or additions to that Work or Derivative Works thereof, that is intentionally submitted to Licensor for inclusion in the Work by the copyright owner or by an individual or Legal Entity authorized to submit on behalf of the copyright owner. For the purposes of this definition, "submitted" means any form of electronic, verbal, or written communication sent to the Licensor or its representatives, including but not limited to communication on electronic mailing lists, source code control systems, and issue tracking systems that are managed by, or on behalf of, the Licensor for the purpose of discussing and improving the Work, but excluding communication that is conspicuously marked or otherwise designated in writing by the copyright owner as "Not a Contribution." "Contributor" shall mean Licensor and any individual or Legal Entity on behalf of whom a Contribution has been received by Licensor and subsequently incorporated within the Work. 2. Grant of Copyright License. Subject to the terms and conditions of this License, each Contributor hereby grants to You a perpetual, worldwide, non-exclusive, no-charge, royalty-free, irrevocable copyright license to reproduce, prepare Derivative Works of, publicly display, publicly perform, sublicense, and distribute the Work and such Derivative Works in Source or Object form. 3. Grant of Patent License. Subject to the terms and conditions of this License, each Contributor hereby grants to You a perpetual, worldwide, non-exclusive, no-charge, royalty-free, irrevocable (except as stated in this section) patent license to make, have made, use, offer to sell, sell, import, and otherwise transfer the Work, where such license applies only to those patent claims licensable by such Contributor that are necessarily infringed by their Contribution(s) alone or by combination of their Contribution(s) with the Work to which such Contribution(s) was submitted. If You institute patent litigation against any entity (including a cross-claim or counterclaim in a lawsuit) alleging that the Work or a Contribution incorporated within the Work constitutes direct or contributory patent infringement, then any patent licenses granted to You under this License for that Work shall terminate as of the date such litigation is filed. 4. Redistribution. You may reproduce and distribute copies of the Work or Derivative Works thereof in any medium, with or without modifications, and in Source or Object form, provided that You meet the following conditions: (a) You must give any other recipients of the Work or Derivative Works a copy of this License; and (b) You must cause any modified files to carry prominent notices stating that You changed the files; and (c) You must retain, in the Source form of any Derivative Works that You distribute, all copyright, patent, trademark, and attribution notices from the Source form of the Work, excluding those notices that do not pertain to any part of the Derivative Works; and (d) If the Work includes a "NOTICE" text file as part of its distribution, then any Derivative Works that You distribute must include a readable copy of the attribution notices contained within such NOTICE file, excluding those notices that do not pertain to any part of the Derivative Works, in at least one of the following places: within a NOTICE text file distributed as part of the Derivative Works; within the Source form or documentation, if provided along with the Derivative Works; or, within a display generated by the Derivative Works, if and wherever such third-party notices normally appear. The contents of the NOTICE file are for informational purposes only and do not modify the License. You may add Your own attribution notices within Derivative Works that You distribute, alongside or as an addendum to the NOTICE text from the Work, provided that such additional attribution notices cannot be construed as modifying the License. You may add Your own copyright statement to Your modifications and may provide additional or different license terms and conditions for use, reproduction, or distribution of Your modifications, or for any such Derivative Works as a whole, provided Your use, reproduction, and distribution of the Work otherwise complies with the conditions stated in this License. 5. Submission of Contributions. Unless You explicitly state otherwise, any Contribution intentionally submitted for inclusion in the Work by You to the Licensor shall be under the terms and conditions of this License, without any additional terms or conditions. Notwithstanding the above, nothing herein shall supersede or modify the terms of any separate license agreement you may have executed with Licensor regarding such Contributions. 6. Trademarks. This License does not grant permission to use the trade names, trademarks, service marks, or product names of the Licensor, except as required for reasonable and customary use in describing the origin of the Work and reproducing the content of the NOTICE file. 7. Disclaimer of Warranty. Unless required by applicable law or agreed to in writing, Licensor provides the Work (and each Contributor provides its Contributions) on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied, including, without limitation, any warranties or conditions of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A PARTICULAR PURPOSE. You are solely responsible for determining the appropriateness of using or redistributing the Work and assume any risks associated with Your exercise of permissions under this License. 8. Limitation of Liability. In no event and under no legal theory, whether in tort (including negligence), contract, or otherwise, unless required by applicable law (such as deliberate and grossly negligent acts) or agreed to in writing, shall any Contributor be liable to You for damages, including any direct, indirect, special, incidental, or consequential damages of any character arising as a result of this License or out of the use or inability to use the Work (including but not limited to damages for loss of goodwill, work stoppage, computer failure or malfunction, or any and all other commercial damages or losses), even if such Contributor has been advised of the possibility of such damages. 9. Accepting Warranty or Additional Liability. While redistributing the Work or Derivative Works thereof, You may choose to offer, and charge a fee for, acceptance of support, warranty, indemnity, or other liability obligations and/or rights consistent with this License. However, in accepting such obligations, You may act only on Your own behalf and on Your sole responsibility, not on behalf of any other Contributor, and only if You agree to indemnify, defend, and hold each Contributor harmless for any liability incurred by, or claims asserted against, such Contributor by reason of your accepting any such warranty or additional liability. END OF TERMS AND CONDITIONS APPENDIX: How to apply the Apache License to your work. To apply the Apache License to your work, attach the following boilerplate notice, with the fields enclosed by brackets "[]" replaced with your own identifying information. (Don't include the brackets!) The text should be enclosed in the appropriate comment syntax for the file format. We also recommend that a file or class name and description of purpose be included on the same "printed page" as the copyright notice for easier identification within third-party archives. Copyright [yyyy] [name of copyright owner] Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.PK!HtW X\(aioacm_sdk_python-0.3.13.dist-info/WHEEL 1 0 нR^@DD[(:v~3؛UfZZC;'qܦZiyi %_Wv{h PK!HJ-  +aioacm_sdk_python-0.3.13.dist-info/METADATAY۸_O:ma&=u~b48 h$dE,}Bg>Z$53惣gi!{x<CƥAᦟJꟜz"y{zςH$gK |͸'TLibO vXy x U:0{{2Qʃ_zʆLdǾ`q{ɻ~=2oN7yĵK)@݈ nr ۥڊlg‘63xdŮy*xkN.x9ls!uN>G6ke M$Eb1AT&Sz}{,M q!t4U} c>mBK<E>2ï~ߧYt`J"R g_oWgSwpI+_B\$cAo^Q!v ~`["\_o'LLETl Ph{}68 E{o+ A?k/ N62b*BR'!oG*,i1?`m,_p[kF \ր m' J[GG7zީv!L3^(R2id` #N*ấ1jiY=o|{qwy;eg Cw/[f<[ DJMgvs sNKmE&WL(m(9C4}>Td2UE?Z^Ibfb "vcgE2+\F3 mFեHfx7^M{8@bψNjǡѳՈk.䐵cx_;ɱ#8Z t:,M-nƨW֋̀WKVI*P 44c|cz1눺Ϻde\!ĵ,4u>bɋJb~z:oG9 ҏ\=Èz Rn!&މ2,^lgQ}ҞyRclgsf9 Ң"z5;59P'X˖ZDn C2rhT,1Dޯ!/X;HD(Zh?U,n!fK[`!"Rc]CREQgZue.HxCŃ9 ˅ &p /xLp(Y&`OVm+zפɅS$:K'ϙ\lөbM%L'4UaUr7,tTv߭8͊o-$ ɫȒC R`jI J9i5&H )K@2kWן]ZxuCF̓`Z`5n*$O@ԾvڰzM;=1̶24S$#^L]zSs6]TH[ k<7v7pqC)>!ؒ7>n3 6Q| 2ߟ۶P@/8 $;EVEeޣQ aSx;f|~iғ#}6{pb <4n،?s 1x+А;#MJ (lgXdz̜ |2CP4?XH .ng=f|v5io[:*VTwRNlZ& [=C҈d*V}7$6~z)+]hha"z}+PK!HyT)aioacm_sdk_python-0.3.13.dist-info/RECORD˲c@yr:Y-$m  ~j&S0è X5e!ͰH} L1*;FF̀9cY|'Up@*6ɓ^sL|a2g"XKuYUިaVeƾGmvaipS^%Y-Թh2l蠡yE>Ȗ+]ZMp@,2UF"ͫ^l(}*9"ӄϰ\B'xDocV78=Ė/mr>PLj-V^.y±=/HbInfS<8A5 ꊤVp*λ̫[*>zJHE,D1Ho-x1d4cS`7i~d֞)EEsy ';]-ŋ Ssh=pV2;/gP)'  !oPK!s,aioacm/__init__.pyPK!+aioacm/client.pyPK!aaioacm/command.pyPK!rb'aioacm/commons.pyPK!0,P%%p*aioacm/files.pyPK!̼/pp0aioacm/params.pyPK!7{& & `3aioacm/server.pyPK!B[,[,*<aioacm_sdk_python-0.3.13.dist-info/LICENSEPK!HtW X\(Wiaioacm_sdk_python-0.3.13.dist-info/WHEELPK!HJ-  +iaioacm_sdk_python-0.3.13.dist-info/METADATAPK!HyT)Ovaioacm_sdk_python-0.3.13.dist-info/RECORDPK x