PK!/uydophon_cloud/__init__.py# coding: utf-8 from gevent import monkey monkey.patch_all() from dophon_properties import * get_properties([DOPHON, CLOUD_CLIENT]) from dophon import cloud_client_properties as properties DOPHON = 'enhance_pkg' EUREKA = 'eureka_client' # 初始化服务单元 # import sys # for k,v in sys.modules.items(): # if str(k).startswith('dophon'): # print(f'{k}---{v}') client_cell = __import__(f'dophon_cloud.{eval(properties.center_type.upper())}', fromlist=['dophon_cloud']) # print(client_cell) __pre__all__ = [] for field in dir(client_cell): if field.startswith('__') and field.endswith('__'): continue globals()[field] = getattr(client_cell, field) __pre__all__.append(field) # print(__pre__all__) __all__ = __pre__all__ PK!s)3_;_;$dophon_cloud/enhance_pkg/__init__.py# coding: utf-8 import random from flask import Flask, jsonify, request as reqq from urllib import request, parse import socket, hashlib, re, time, threading import urllib3 import urllib3.util from dophon_logger import * from dophon import properties from .decrate_enhance import e_app,e_dophon get_logger(DOPHON).inject_logger(globals()) class micro_cell(): ''' 远程请求调用封装类,用urllib处理请求发送和响应处理 ''' # 暂时启用轮询方式访问实例 __instance_index = 0 def __init__(self, e_app, service_name: str, interface: str, data=None, headers={}, origin_req_host=None, unverifiable=False, method=None): self.__interface = interface self.__service_name = service_name.upper() self.__interface_app = e_app info = e_app.reg_info() # 启动远程调用部件初始化线程 threading.Thread(target=self.listen_instance, kwargs={ 'info': info, 'init_args': { 'data': data, 'headers': headers, 'origin_req_host': origin_req_host, 'unverifiable': unverifiable, 'method': method }, 'debug_trace': getattr(properties, 'cloud_debug_trace', False) }).start() def listen_instance(self, info: dict, init_args: dict, debug_trace: bool): """ 防止远程调用部件加载失败导致服务瘫痪 :param info: :return: """ retry = 0 while True: if self.__service_name in info: # 获取服务对应的实例列表 self.__instances = info[self.__service_name] logger.info('服务(' + self.__service_name + ')远程调用建立') break else: # 服务不存在请求注册中心获得服务实例信息 self.__interface_app.async_reg_info() # 再次检查服务信息 if self.__service_name in self.__interface_app.reg_info(): self.__instances = self.__interface_app.reg_info()[self.__service_name] logger.info('服务(' + self.__service_name + ')远程调用建立(重请求)') break else: if debug_trace and retry < 3: # 提示三次后关闭提示 logger.warning('服务(' + self.__service_name + ')不存在!,远程调用有丢失风险') retry += 1 # 阻塞等待服务启动 time.sleep(2) instance = self.__instances[self.__instance_index] # 检查服务注册信息(防止调用自身) if self.__interface_app.service_name() == self.__service_name: print('警告::使用有风险的远程调用单元(', self.__service_name, ',', self.__interface, '),可能导致请求失败') try: if instance['host'] is socket.gethostbyname(socket.getfqdn(socket.gethostname())) \ and \ instance['port'] is self.__interface_app.port(): # 检查请求路径 self.__instances.pop() instance = self.__instances[self.__instance_index] except Exception as e: raise e url = str('http://' + instance['host'] + ':' + instance['port'] + self.__interface) self.__id = instance['id'] self.__req_obj = request.Request(url=url, **init_args) def check_active(self): try: self.__req_obj except: return False else: return True def __str__(self): return hashlib.sha1( (re.sub(':', '', self.__host) + str(self.__interface)).encode('utf8')).hexdigest() def __eq__(self, other): return str(other) == self.__str__() def request(self, data=None): ''' 发起远程请求 :param data: 请求数据 :return: 请求响应 ''' pass # 重写数据集(无数据集调用初始化数据集) _data = data if data else self.__req_obj.data if hasattr(self, '__instances'): instance = self.__instances[self.__instance_index] # 检查服务注册信息(防止调用自身) if self.__interface_app.service_name() == self.__service_name: print('警告::(有风险)使用远程调用单元调用自身服务(', self.__service_name, ',', self.__interface, '),可能导致请求失败') try: if instance['host'] is socket.gethostbyname(socket.getfqdn(socket.gethostname())) \ and \ instance['port'] is self.__interface_app.port(): # 检查请求路径 self.__instances.pop() else: self.__instance_index = self.__instance_index + 1 instance = self.__instances[self.__instance_index] url = str('http://' + instance['host'] + ':' + instance['port'] + self.__interface) setattr(self.__req_obj, 'full_url', url) except Exception as e: # print(e) pass res = request.urlopen(self.__req_obj, data=bytes(parse.urlencode(_data), encoding="utf8") if _data else None) # 轮询方式访问实例 self.__instance_index = (self.__instance_index + 1) if self.__instance_index < len( self.__instances) - 2 else 0 self.__id = instance['id'] url = str('http://' + instance['host'] + ':' + instance['port'] + self.__interface) setattr(self.__req_obj, 'full_url', url) # 处理结果集 remote_result = res.data.decode('utf8') try: remote_result = eval(remote_result) except SyntaxError as se: remote_result_list = re.sub('(<\w+>|)', '\n', remote_result).split('\n') remote_result = ' '.join(remote_result_list) return remote_result else: return {'event': 404, 'msg': self.__service_name + '服务调用异常'} def pool_request(self, pool: urllib3.PoolManager, data: dict = None): """ 池化http请求方式发起远程调用 :param pool: urllib3连接池实例 :param data:请求参数 :return: """ try: # 轮询方式访问实例 instance = self.__instances[self.__instance_index] # 检查服务注册信息(防止调用自身) if self.__interface_app.service_name() == self.__service_name: print('警告::(有风险)使用远程调用单元调用自身服务(', self.__service_name, ',', self.__interface, '),可能导致请求失败') try: if instance['host'] is socket.gethostbyname(socket.getfqdn(socket.gethostname())) \ and \ instance['port'] is self.__interface_app.port(): # 检查请求路径 self.__instances.pop() else: self.__instance_index = self.__instance_index + 1 instance = self.__instances[self.__instance_index] except Exception as e: # print(e) pass url = str('http://' + instance['host'] + ':' + instance['port'] + self.__interface) res = pool.request(method=self.__req_obj.get_method(), url=url, fields=data) self.__id = instance['id'] self.__instance_index = (self.__instance_index + 1) if self.__instance_index < len( self.__instances) - 2 else 0 url = str('http://' + instance['host'] + ':' + instance['port'] + self.__interface) setattr(self.__req_obj, 'full_url', url) remote_result = res.data.decode('utf8') try: remote_result = eval(remote_result) except SyntaxError as se: remote_result_list = re.sub('(<\w+>|)', '\n', remote_result).split('\n') remote_result = ' '.join(remote_result_list) return remote_result if res.status == 200 else {'event': res.status, 'msg': self.__service_name + '服务调用异常'} except Exception as e: return {'event': 500, 'msg': self.__service_name + '服务调用异常', 'reason': str(e)} class micro_cell_list(): ''' 服务调用集合类,自带urllib3连接池 ''' __family = {} def __init__(self, app, pool_size: int = 10, properties: dict = {}): ''' 初始化远程调用实例集群 连接池默认连接数为10 配置格式:{ service_name :[ { public_interface_prefix_1:[ interface_prefix_1, interface_prefix_2, ... ] }, { public_interface_prefix_2:[ interface_prefix_1, interface_prefix_2, ... ] }, ... ] } 初始化后集群格式: { service_name: { public_interface_prefix_1:{ interface_prefix_1:, interface_prefix_2:, interface_prefix_3:, ... }, public_interface_prefix_1:{ interface_prefix_1:, interface_prefix_2:, interface_prefix_3:,... } },... },... ''' act_pool_workers = pool_size # 根据配置初始化集群 if properties: # 存在集群配置 for k, v in properties.items(): services = {} for pub_v_item in v: public_interfaces = {} for k_item in pub_v_item: if not str(k_item).startswith('/'): raise Exception('配置路径格式有误,请以/开头(service_name:' + k + ',public_prefix:' + k_item + ')') v_item = pub_v_item[k_item] interface = {} for item in v_item: if not str(item).startswith('/'): raise Exception( '配置路径格式有误,请以/开头(service_name:' + k + ',public_prefix:' + k_item + ',prefix:' + item + ')') # 实例化远程调用实例 m_cell = micro_cell(app, str(k), str(k_item) + str(item)) interface[item] = m_cell act_pool_workers += 1 public_interfaces = interface services[k_item] = public_interfaces self.__family[k.upper()] = services # 初始化连接池 self.__req_pool = urllib3.PoolManager(act_pool_workers) def request(self, service_name: str, interface: list, data: dict = None): """ 直接使用配置的服务调用单元集群发起请求 使用多线程发起请求 :param service_name: 服务名 :param interface: 接口名,格式[公用接口,细化接口] :return: """ try: _pub_interface_prefix = interface[0] except Exception: # 无法取值则默认为空 _pub_interface_prefix = '' try: _interface_prefix = interface[1] except Exception: # 无法取值则默认为空 _interface_prefix = '' args = (service_name.upper(), _pub_interface_prefix, _interface_prefix, self.__req_pool, data) try: service_cell = self.__family[service_name.upper()][_pub_interface_prefix][_interface_prefix] target_obj = lambda service_name, __pb_inter_pre, __inter_pre, pool, data=None: \ service_cell.pool_request(pool=pool, data=data) result = target_obj(*args) return result if service_cell.check_active() else { 'event': 404, 'msg': '实例不存在', 'service_name': service_name } except KeyError as ke: raise Exception('接口映射未定义: %s ' % ke) def get_cell_obj(self, service_name: str, interface: list): """ 获取对应服务调用单元实例 :param service_name: 服务名 :param interface: 接口名,格式[公用接口,细化接口] :return: """ _pub_prefix = interface[0] if interface[0] else '' _prefix = interface[1] if len(interface) > 1 and interface[1] else '' return self.__family[service_name][_pub_prefix][_prefix] def enhance(import_name, properties: dict, static_url_path=None, static_folder='static', template_folder='templates', instance_path=None, instance_relative_config=False, root_path=None): ''' 获取增强服务器实例方法 :param import_name: 实例代号,通常为__name__ :param properties: 增强配置对象,类型为json :param static_path: :param static_url_path: :param static_folder: :param template_folder: :param instance_path: :param instance_relative_config: :param root_path: :return:增强服务器实例(可作为微服务) ''' obj = e_app(import_name, properties, static_url_path=static_url_path, static_folder=static_folder, template_folder=template_folder, instance_path=instance_path, instance_relative_config=instance_relative_config, root_path=root_path) return obj def instance_not_exist(): """ 实例不存在的共用方法 :return: """ return {'event': 404, 'msg': '实例不存在'} PK!!dophon_cloud/enhance_pkg/const.pyPK!L$FF+dophon_cloud/enhance_pkg/decrate_enhance.pyimport re import socket import threading import time from flask import request as reqq, jsonify, Flask import urllib3 import random from dophon_logger import * get_logger(DOPHON).inject_logger(globals()) class e_app(Flask): ''' 服务器封装类(不支持dophon) 请求的发送和封装使用urllib3处理 简单的一个基于flask服务器的一个实例 ''' __host = '127.0.0.1' __port = 5000 __reg_url = 'http://localhost:8361/reg/service/' __reg_update_url = None __reg_info = {} __reg_heart = False __reg_center_list = [] # 建立池化连接池 __req_pool_pool = {} ''' 增强app内部定义连接池(默认10个连接数) ''' req_pool = urllib3.PoolManager() def __init__(self, import_name, properties: dict, static_url_path=None, static_folder='static', template_folder='templates', instance_path=None, instance_relative_config=False, root_path=None): ''' 初始化服务器实例 :param import_name: :param properties: :param static_url_path: :param static_folder: :param static_host: :param host_matching: :param subdomain_matching: :param template_folder: :param instance_path: :param instance_relative_config: :param root_path: ''' # 启动微服务注册参数预处理流程 if type(properties) is type(''): # 反射获取配置 try: prop = __import__(properties) self.__prop = { 'host': getattr(prop, 'host', self.__host), 'port': getattr(prop, 'port', self.__port), 'service_name': getattr(prop, 'service_name').upper(), 'health_interface': getattr(prop, 'health_interface', '/heart'), 'prefer_own_ip': getattr(prop, 'prefer_own_ip', False) } except Exception as e: raise Exception('配置文件加载失败,请检查路径,错误信息:(' + str(e) + ')') elif isinstance(properties, type(())): raise Exception('暂不支持元组类型配置,请选择其他配置') elif isinstance(properties, type({})): if 'service_name' in properties.keys(): self.__prop = { 'service_name': properties['service_name'].upper(), 'health_interface': properties[ 'health_interface'] if 'health_interface' in properties.keys() else '/heart', 'host': properties['host'] if 'host' in properties.keys() else self.__host, 'port': properties['port'] if 'port' in properties.keys() and isinstance(properties['port'], type(1)) else self.__port, 'prefer_own_ip': socket.gethostbyname( socket.getfqdn(socket.gethostname())) if 'prefer_own_ip' in properties.keys() and isinstance( properties['prefer_own_ip'], type(True)) else False } else: raise Exception('缺少必要参数(service_name)') # 初始化服务器 Flask.__init__(self, import_name, static_url_path=static_url_path, static_folder=static_folder, template_folder=template_folder, instance_path=instance_path, instance_relative_config=instance_relative_config, root_path=root_path) print('实例注册参数::', self.__prop) self.init_reg_url(properties) self.regist_myself() # 注册自身功能接口 self.add_url_rule('/heart/as/', 'receive_heart', self.receive_heart, methods=['POST']) self.add_url_rule('/heart', 'show_own_info', self.show_own_info, methods=['POST']) def init_reg_url(self, properties: dict): """ 初始化注册信息 :param properties: :return: """ self.__reg_url = (properties['reg_url'] if 'reg_url' in properties.keys() else self.__reg_url) + self.__prop[ 'service_name'] self.init_reg_update_url() def init_reg_update_url(self): """ 初始化注册信息更新接口 :return: """ self.__reg_update_url = re.sub('/reg/service/.*', '/reg/update', self.__reg_url) def regist_myself(self): """ 向注册中心注册自身服务 :return: """ while True: try: # 向注册中心发起注册 res = self.req_pool.request(method='get', url=self.__reg_url, headers={ 'prefer_ip': socket.gethostbyname(socket.getfqdn(socket.gethostname())) if self.__prop[ 'prefer_own_ip'] else self.__host, 'service_port': self.__prop['port'] }) res_data = eval(res.data.decode('utf8')) if 'DOPHON_REG_CENTER_CLUSTERS' in res_data: _cache_reg_center_list = res_data.pop('DOPHON_REG_CENTER_CLUSTERS') _cache_list = [] for val in _cache_reg_center_list.values(): _cache_list.append( 'http://' + val['host'] + ':' + str(val['port']) + '/reg/service/' + self.__prop[ 'service_name'] ) self.__reg_center_list = _cache_list self.__reg_list_info = res_data break except Exception as e: print('微服务(%s)启动失败!!,错误原因:: %s' % (self.__prop['service_name'].upper(), str(e))) if self.__reg_center_list: print('存在注册中心集群,正在从备选中重新注册') # 将当前注册路径放入备选列表 self.__reg_center_list.append(self.__reg_url) # 在备选列表中挑选一个注册中心进行注册(默认fifo) trys_url = self.__reg_center_list.pop(random.randint(0, len(self.__reg_center_list) - 1)) print('原注册路径:', self.__reg_url, '\n', '备选注册路径:', trys_url, '\n', '备选列表:', self.__reg_center_list) self.__reg_url = trys_url self.init_reg_update_url() print('30秒后重新注册') time.sleep(30) def service_name(self): return self.__prop['service_name'] def port(self): return self.__prop['port'] ''' 运行增强服务器 ''' def run(self, host=None, port=None, debug=None, **options): # 启动注册中心健康监测 threading.Thread(target=self.check_reg_center).start() self.config['JSON_AS_ASCII'] = False # 初始化注册服务实例(根据注册时返回的实例信息) Flask.run(self, host=self.__prop['host'], port=self.__prop['port'], ) def update_reg_info(self, r): """ 更新微服务集群信息 :param r: :return: """ if not isinstance(r, type({})): for k in r.keys(): v = r[k] r[k] = eval(v) if hash(str(self.__reg_info)) != hash(str(r)): logger.info('更新实例注册信息, %s %s' % (str(r), type(r))) self.__reg_info = r # 写入接收心跳标识 if not self.__reg_heart: self.__reg_heart = True ''' 获取注册中心服务注册信息 ''' def reg_info(self): return self.__reg_info if self.__reg_info else self.__reg_list_info ''' 同步服务信息 ''' def async_reg_info(self): if not self.__req_pool_pool: addr = self.__reg_url.split('//')[1].split('/')[0].split(':') self.__req_pool_pool = urllib3.HTTPConnectionPool(addr[0], int(addr[1]), maxsize=10, block=False, retries=3, timeout=urllib3.util.Timeout.DEFAULT_TIMEOUT) try: # 暂时使用重注册更新服务信息 res = self.__req_pool_pool.request(method='get', url=self.__reg_update_url) result = eval(res.data.decode('utf8')) self.__reg_info = result return result except: return {} def check_reg_center(self): if self.__reg_heart: # 存在接收心跳标识 return else: # 定期检查注册中心 while True: # 五分钟检查一次(默认) time.sleep(300) try: res = self.req_pool.request(url=re.sub('/reg/service/.*', '/health', self.__reg_url), method='GET') print('注册中心存活' if res.status == 200 else '注册中心失活(无响应)') except Exception as e: print('注册中心失活', str(e)) finally: self.regist_myself() def receive_heart(self, service_name): ''' 接收注册中心心跳接口 :return: 心跳接收信息 ''' # print('收到心跳!!!') if reqq.is_json: reg_info = reqq.json else: reg_info = reqq.form.to_dict() self.update_reg_info(reg_info) # 检查自身注册信息是否正确 if self.__prop['service_name'] == service_name: return jsonify({'event': 200, 'msg': '收到心跳'}) else: # 自身实例注册信息有误 return jsonify({'event': 404, 'msg': '收到心跳'}) ''' 微服务调用单元列表初始化(暂时弃用 ) ''' # def micro_request_init(self,service,interface=[]): # m_c_l=micro_cell_list() # # 查找服务是否存在 # for ser in service: # for inter in interface: # if ser not in self.__reg_info.keys(): # # 不存在已注册服务中提示警告 # print('<警告>服务',ser,'未找到注册实例,接口',inter,'有失效风险') # m_c_l.append(micro_cell(ser,inter)) # else: # for ser_int in self.__reg_list_info[self.__reg_list_info[ser]]: # print(ser_int) def show_own_info(self): """ 展示自身信息 :return: """ return jsonify(self.__reg_info) class e_dophon(): ''' 服务器封装类(支持dophon实例) 请求的发送和封装使用urllib3处理 简单的一个基于flask服务器的一个实例 ''' __reg_url = 'http://localhost:8361/reg/service/' __reg_update_url = None __reg_info = {} __reg_heart = False ''' 增强app内部定义连接池(默认10个连接数) ''' req_pool = urllib3.PoolManager() def __init__(self, service_name: str, reg_center_addr: tuple, instance=None): self.__dophon_instance = instance if instance else __import__('dophon.boot', fromlist=True) self.__dophon_app = getattr(self.__dophon_instance, 'app') # 读取服务器配置 prop = __import__('dophon.cloud.client.properties', fromlist=True) # 写入服务名 setattr(prop, 'service_name', service_name) # 写入注册中心信息 setattr(prop, 'reg_url', '%s:%s%s' % reg_center_addr) # 启动微服务注册参数预处理流程 if hasattr(prop, 'service_name'): self.__prop = { 'service_name': prop.service_name.upper(), 'health_interface': prop[ 'health_interface'] if hasattr(prop, 'health_interface') else '/heart', 'host': prop.host if hasattr(prop, 'host') else self.__host, 'port': prop.port if hasattr(prop, 'port') and isinstance(prop.port, type(1)) else self.__port, 'prefer_own_ip': prop.prefer_ip_str if hasattr(prop, 'prefer_ip_str') else socket.gethostbyname(socket.getfqdn(socket.gethostname())) if hasattr(prop, 'prefer_own_ip') and isinstance(prop.prefer_own_ip, type(True)) else False } else: raise Exception('缺少必要参数(service_name)') print('实例注册参数::', self.__prop) self.__reg_url = (prop.reg_url if hasattr(prop, 'reg_url') else self.__reg_url) + self.__prop[ 'service_name'] self.__reg_update_url = re.sub('/reg/service/.*', '/reg/update', self.__reg_url) self.regist_myself() # 绑定自身功能接口 self.__dophon_app.add_url_rule('/heart/as/', 'receive_heart', self.receive_heart, methods=['POST']) self.__dophon_app.add_url_rule('/heart', 'show_own_info', self.show_own_info, methods=['POST']) def regist_myself(self): """ 向注册中心注册自身服务 :return: """ while True: try: # 向注册中心发起注册 res = self.req_pool.request(method='get', url=self.__reg_url, headers={ 'prefer_ip': self.__prop['prefer_own_ip'] if self.__prop['prefer_own_ip'] else self.__prop['host'], 'service_port': self.__prop['port'] }) self.__reg_list_info = eval(res.data.decode('utf8')) break except Exception as e: print('微服务(%s)启动失败!!,错误原因:: %s' % (self.__prop['service_name'].upper(), str(e))) print('30秒后重新注册') time.sleep(30) def service_name(self): return self.__prop['service_name'] def port(self): return self.__prop['port'] ''' 运行增强服务器 ''' def run(self, host=None, port=None, debug=None, **options): # 启动注册中心健康监测 threading.Thread(target=self.check_reg_center).start() # 初始化注册服务实例(根据注册时返回的实例信息) self.__dophon_instance.run_app() ''' 更新微服务集群信息 ''' def update_reg_info(self, r): print('更新实例注册信息,', str(r), type(r)) if not isinstance(r, type({})): for k in r.keys(): v = r[k] r[k] = eval(v) self.__reg_info = r # 写入接收心跳标识 if not self.__reg_heart: self.__reg_heart = True ''' 获取注册中心服务注册信息 ''' def reg_info(self): return self.__reg_info if self.__reg_info else self.__reg_list_info ''' 同步服务信息 ''' def async_reg_info(self): # 暂时使用重注册更新服务信息 res = self.req_pool.request(method='get', url=self.__reg_update_url) result = eval(res.data.decode('utf8')) self.__reg_info = result return result def check_reg_center(self): if self.__reg_heart: # 存在接收心跳标识 return else: # 定期检查注册中心 while True: # 五分钟检查一次(默认) time.sleep(300) try: res = self.req_pool.request(url=re.sub('/reg/service/.*', '/health', self.__reg_url), method='GET') self.regist_myself() print('注册中心存活' if res.status == 200 else '注册中心失活') except Exception as e: print('注册中心失活', str(e)) def receive_heart(self, service_name): ''' 接收注册中心心跳接口 :return: 心跳接收信息 ''' # print('收到心跳!!!') if reqq.is_json: reg_info = reqq.json else: reg_info = reqq.form.to_dict() self.update_reg_info(reg_info) # 检查自身注册信息是否正确 if self.__prop['service_name'] == service_name: return jsonify({'event': 200, 'msg': '收到心跳'}) else: # 自身实例注册信息有误 return jsonify({'event': 404, 'msg': '收到心跳'}) ''' 微服务调用单元列表初始化(暂时弃用 ) ''' # def micro_request_init(self,service,interface=[]): # m_c_l=micro_cell_list() # # 查找服务是否存在 # for ser in service: # for inter in interface: # if ser not in self.__reg_info.keys(): # # 不存在已注册服务中提示警告 # print('<警告>服务',ser,'未找到注册实例,接口',inter,'有失效风险') # m_c_l.append(micro_cell(ser,inter)) # else: # for ser_int in self.__reg_list_info[self.__reg_list_info[ser]]: # print(ser_int) def show_own_info(self): """ 展示自身信息 :return: """ return jsonify(self.__reg_info) PK!\; &dophon_cloud/eureka_client/__init__.py# encoding:utf-8 """ 此处使用 keijack/python-eureka-client 作为与eureka对接的单元 也可以使用Spring自带的sidecar 感谢keijack的项目 项目仓库: https://gitee.com/keijack/python-eureka-client.git """ from dophon import cloud_client_properties as properties from dophon_logger import * from ..utils import get_host_ip name = "eureka_client" get_logger(DOPHON).inject_logger(globals()) from .eureka_client import * # from .eureka_client_urllib3 import * assert properties.client, '无法获取client配置,请检查配置文件' client_config = properties.client app_name = client_config.name if hasattr(client_config, 'name') else __name__ ins_host = client_config.host if hasattr(client_config, 'host') else get_host_ip() # properties.host ins_ip = (client_config.ip if hasattr(client_config, 'ip') else get_host_ip()) \ if client_config.prefer_ip else 'localhost' # properties.ip ins_port = client_config.port if hasattr(client_config, 'port') else properties.port ha_type = eval(client_config.ha if hasattr(client_config, 'ha') else 'HA_STRATEGY_RANDOM') # 断言校验 assert app_name and isinstance(app_name, str), 'app_name参数不正确,请检查参数' assert ins_host and isinstance(ins_host, str), 'ins_host参数不正确,请检查参数' assert ins_ip and isinstance(ins_ip, str), 'ins_ip参数不正确,请检查参数' assert ins_port and isinstance(ins_port, int) and ins_port >= 0, 'ins_port参数不正确,请检查参数' assert ins_port == properties.port, f'实例监听端口异常{ins_port} -- {properties.port}' logger.info(f'初始化服务单元{client_config}') # 启动eureka_client单元 # The flowing code will register your server to eureka server and also start to send heartbeat every 30 seconds client_proxy, discovery_proxy = init(eureka_server=client_config.center, app_name=app_name, # 当前组件的主机名,可选参数,如果不填写会自动计算一个,如果服务和 eureka 服务器部署在同一台机器,请必须填写,否则会计算出 127.0.0.1 instance_host=ins_host, instance_port=ins_port, instance_ip=ins_ip, # 调用其他服务时的高可用策略,可选,默认为随机 ha_strategy=ha_type) logger.info(f'服务单元初始化完毕') from dophon import boot from dophon.annotation import * bean(name='discovery_proxy')(lambda: discovery_proxy)() def open_cloud(f, *args, **kwargs): @boot.BeanScan() def inner_method(): try: f(boot=boot, *args, **kwargs) except: f(*args, **kwargs) stop() return return inner_method OpenCloud = open_cloud __all__ = ['OpenCloud'] PK!m(/dophon_cloud/eureka_client/__urlopen_proxy__.py# -*- coding: utf-8 -*- import socket import re import base64 from urllib3 import PoolManager POOL = PoolManager() try: import urllib.request as urllib2 from urllib.error import HTTPError from urllib.error import URLError except ImportError: import urllib2 from urllib2 import HTTPError from urllib2 import URLError _URL_REGEX = re.compile( r'^(?:http)s?://' # http:// or https:// r'(([A-Z0-9_~!.%]+):([A-Z0-9_~!.%]+)@)?' # basic authentication -> username:password@ r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+(?:[A-Z]{2,6}\.?|[A-Z0-9-]{2,}\.?)|' # domain... r'(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?)|' # domain name without `.` r'localhost|' # localhost... r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})' # ...or ip r'(?::\d+)?' # optional port r'(?:/?|[/?]\S+)$', re.IGNORECASE) def get_url_and_basic_auth(addr_url): addr = addr_url match_obj = _URL_REGEX.match(addr) groups = match_obj.groups() if (groups[0] is not None): addr = addr.replace(groups[0], "") user_name = groups[1] user_psw = groups[2] ori_auth = ("%s:%s" % (user_name, user_psw)).encode() auth_str = base64.standard_b64encode(ori_auth).decode() return (addr, auth_str) else: return (addr, None) class Request: def __init__(self, url, data=None, headers={}, origin_req_host=None, unverifiable=False, method=None): url_match = _URL_REGEX.match(url) if url_match is None: raise URLError("Unvalid URL") url_obj = get_url_and_basic_auth(url) url_addr = url_obj[0] url_auth = url_obj[1] try: # super(Request, self).__init__(url_addr, data=data, headers=headers, # origin_req_host=origin_req_host, unverifiable=unverifiable, # method=method) self.__url_adddr = url_addr self.__data = data self.__headers = headers self.__origin_req_host = origin_req_host self.__unverifiable = unverifiable self.__method = method except TypeError: # super(Request, self).__init__(url_addr, data=data, headers=headers, # origin_req_host=origin_req_host, unverifiable=unverifiable) self.get_method = lambda: method if method is not None else "GET" self.__url_adddr = url_addr self.__data = data self.__headers = headers self.__origin_req_host = origin_req_host self.__unverifiable = unverifiable self.__method = self.get_method() if url_auth is not None: self.__headers['Authorization'] = f'Basic {url_auth}' def urlopen(self, timeout=socket._GLOBAL_DEFAULT_TIMEOUT, *args, **kwargs): logger.info('execute self urlopen') __kwargs = { 'method': self.__method, 'url': self.__url_adddr, 'body' if self.__method == 'POST' else 'field': self.__data, 'timeout': timeout } for k, v in kwargs.items(): __kwargs[k] = v return POOL.request( *args, **__kwargs ) def urlopen(url, data=None, timeout=socket._GLOBAL_DEFAULT_TIMEOUT, cafile=None, capath=None, cadefault=False, context=None): if isinstance(url, Request): return url.urlopen(timeout=timeout, data=data, cafile=cafile, capath=capath, cadefault=cadefault, context=context) elif isinstance(url, str): request = Request(url, data=data) return request.urlopen(data=data, timeout=timeout, cafile=cafile, capath=capath, cadefault=cadefault, context=context) else: raise URLError("Unvalid URL") PK!+dophon_cloud/eureka_client/eureka_client.py# -*- coding: utf-8 -*- import atexit import json import os import re import socket import time import random import inspect import xml.etree.ElementTree as ElementTree from threading import Timer from threading import Lock from threading import Thread from dophon_logger import * get_logger(DOPHON).inject_logger(globals()) try: from urllib.parse import urlparse except ImportError: from urlparse import urlparse import py_eureka_client.__urlopen_proxy__ as urllib2 try: long(0) except NameError: # python 3 does no longer support long method, use int instead long = int """ Status of instances """ INSTANCE_STATUS_UP = "UP" INSTANCE_STATUS_DOWN = "DOWN" INSTANCE_STATUS_STARTING = "STARTING" INSTANCE_STATUS_OUT_OF_SERVICE = "OUT_OF_SERVICE" INSTANCE_STATUS_UNKNOWN = "UNKNOWN" """ Action type of instances """ ACTION_TYPE_ADDED = "ADDED" ACTION_TYPE_MODIFIED = "MODIFIED" ACTION_TYPE_DELETED = "DELETED" """ This is for the DiscoveryClient, when this strategy is set, get_service_url will random choose one of the UP instance and return its url This is the default strategy """ HA_STRATEGY_RANDOM = 1 """ This is for the DiscoveryClient, when this strategy is set, get_service_url will always return one instance until it is down """ HA_STRATEGY_STICK = 2 """ This is for the DiscoveryClient, when this strategy is set, get_service_url will always return a new instance if any other instances are up """ HA_STRATEGY_OTHER = 3 """ The timeout seconds that all http request to the eureka server """ _DEFAULT_TIME_OUT = 5 """ Default eureka server url. """ _DEFAULT_EUREKA_SERVER_URL = "http://127.0.0.1:8761/eureka/" """ Default instance field values """ _DEFAULT_INSTNACE_PORT = 9090 _DEFAULT_INSTNACE_SECURE_PORT = 9443 _RENEWAL_INTERVAL_IN_SECS = 30 _DURATION_IN_SECS = 90 _DEFAULT_DATA_CENTER_INFO = "MyOwn" _DEFAULT_DATA_CENTER_INFO_CLASS = "com.netflix.appinfo.InstanceInfo$DefaultDataCenterInfo" """ Default encoding """ _DEFAULT_ENCODING = "utf-8" ### =========================> Base Mehods <======================================== ### ### Beans ### class Applications: def __init__(self, apps__hashcode="", versions__delta="", applications=None): self.apps__hashcode = apps__hashcode self.versions__delta = versions__delta self.__applications = applications if applications is not None else [] self.__application_name_dic = {} self.__app_lock = Lock() @property def appsHashcode(self): return self.apps__hashcode @property def applications(self): return self.__applications @property def versionsDelta(self): return self.versions__delta def add_application(self, application): with self.__app_lock: self.__applications.append(application) self.__application_name_dic[application.name] = application def get_application(self, app_name): with self.__app_lock: if app_name in self.__application_name_dic: return self.__application_name_dic[app_name] else: return Application(name=app_name) class Application: def __init__(self, name="", instances=None): self.name = name self.__instances = instances if instances is not None else [] self.__instances_dict = {} self.__inst_lock = Lock() @property def instances(self): with self.__inst_lock: return self.__instances @property def up_instances(self): with self.__inst_lock: up_inst = [] for item in self.__instances: if item.status == INSTANCE_STATUS_UP: up_inst.append(item) return up_inst def get_instance(self, instance_id): with self.__inst_lock: if instance_id in self.__instances_dict: return self.__instances_dict[instance_id] else: return None def add_instance(self, instance): with self.__inst_lock: self.__instances.append(instance) self.__instances_dict[instance.instanceId] = instance def update_instance(self, instance): with self.__inst_lock: logger.debug("update instance %s" % instance.instanceId) updated = False for idx in range(len(self.__instances)): ele = self.__instances[idx] if ele.instanceId == instance.instanceId: logger.debug("updating index %d" % idx) self.__instances[idx] = instance updated = True break if not updated: self.add_instance(instance) def remove_instance(self, instance): with self.__inst_lock: for idx in range(len(self.__instances)): ele = self.__instances[idx] if ele.instanceId == instance.instanceId: del self.__instances[idx] break if instance.instanceId in self.__instances_dict: del self.__instances_dict[instance.instanceId] class LeaseInfo: def __init__(self, renewalIntervalInSecs=_RENEWAL_INTERVAL_IN_SECS, durationInSecs=_DURATION_IN_SECS, registrationTimestamp=0, lastRenewalTimestamp=0, renewalTimestamp=0, evictionTimestamp=0, serviceUpTimestamp=0): self.renewalIntervalInSecs = renewalIntervalInSecs self.durationInSecs = durationInSecs self.registrationTimestamp = registrationTimestamp self.lastRenewalTimestamp = lastRenewalTimestamp self.renewalTimestamp = renewalTimestamp self.evictionTimestamp = evictionTimestamp self.serviceUpTimestamp = serviceUpTimestamp class DataCenterInfo: def __init__(self, name=_DEFAULT_DATA_CENTER_INFO, # Netflix, Amazon, MyOwn className=_DEFAULT_DATA_CENTER_INFO_CLASS): self.name = name self.className = className class PortWrapper: def __init__(self, port=0, enabled=False): self.port = port self.enabled = enabled class Instance: def __init__(self, instanceId="", sid="", # @deprecated app="", appGroupName="", ipAddr="", port=PortWrapper(port=_DEFAULT_INSTNACE_PORT, enabled=True), securePort=PortWrapper(port=_DEFAULT_INSTNACE_SECURE_PORT, enabled=False), homePageUrl="", statusPageUrl="", healthCheckUrl="", secureHealthCheckUrl="", vipAddress="", secureVipAddress="", countryId=1, dataCenterInfo=DataCenterInfo(), hostName="", status="", # UP, DOWN, STARTING, OUT_OF_SERVICE, UNKNOWN overriddenstatus="", # UP, DOWN, STARTING, OUT_OF_SERVICE, UNKNOWN leaseInfo=LeaseInfo(), isCoordinatingDiscoveryServer=False, metadata=None, lastUpdatedTimestamp=0, lastDirtyTimestamp=0, actionType=ACTION_TYPE_ADDED, # ADDED, MODIFIED, DELETED asgName=""): self.instanceId = instanceId self.sid = sid self.app = app self.appGroupName = appGroupName self.ipAddr = ipAddr self.port = port self.securePort = securePort self.homePageUrl = homePageUrl self.statusPageUrl = statusPageUrl self.healthCheckUrl = healthCheckUrl self.secureHealthCheckUrl = secureHealthCheckUrl self.vipAddress = vipAddress self.secureVipAddress = secureVipAddress self.countryId = countryId self.dataCenterInfo = dataCenterInfo self.hostName = hostName self.status = status self.overriddenstatus = overriddenstatus self.leaseInfo = leaseInfo self.isCoordinatingDiscoveryServer = isCoordinatingDiscoveryServer self.metadata = metadata if metadata is not None else {} self.lastUpdatedTimestamp = lastUpdatedTimestamp self.lastDirtyTimestamp = lastDirtyTimestamp self.actionType = actionType self.asgName = asgName ########################## Basic functions ################################# ####### Registry functions ######### def register(eureka_server, instance): instance_dic = { 'instanceId': instance.instanceId, 'hostName': instance.hostName, 'app': instance.app, 'ipAddr': instance.ipAddr, 'status': instance.status, 'overriddenstatus': instance.overriddenstatus, 'port': { '$': instance.port.port, '@enabled': str(instance.port.enabled).lower() }, 'securePort': { '$': instance.securePort.port, '@enabled': str(instance.securePort.enabled).lower() }, 'countryId': instance.countryId, 'dataCenterInfo': { '@class': instance.dataCenterInfo.className, 'name': instance.dataCenterInfo.name }, 'leaseInfo': { 'renewalIntervalInSecs': instance.leaseInfo.renewalIntervalInSecs, 'durationInSecs': instance.leaseInfo.durationInSecs, 'registrationTimestamp': instance.leaseInfo.registrationTimestamp, 'lastRenewalTimestamp': instance.leaseInfo.lastRenewalTimestamp, 'evictionTimestamp': instance.leaseInfo.evictionTimestamp, 'serviceUpTimestamp': instance.leaseInfo.serviceUpTimestamp }, 'metadata': instance.metadata, 'homePageUrl': instance.homePageUrl, 'statusPageUrl': instance.statusPageUrl, 'healthCheckUrl': instance.healthCheckUrl, 'vipAddress': instance.vipAddress, 'secureVipAddress': instance.secureVipAddress, 'lastUpdatedTimestamp': str(instance.lastUpdatedTimestamp), 'lastDirtyTimestamp': str(instance.lastDirtyTimestamp), 'isCoordinatingDiscoveryServer': str(instance.isCoordinatingDiscoveryServer).lower() } _register(eureka_server, instance_dic) def _register(eureka_server, instance_dic): req = urllib2.Request(_format_url(eureka_server) + "apps/%s" % instance_dic["app"]) req.add_header('Content-Type', 'application/json') req.get_method = lambda: "POST" response = urllib2.urlopen(req, json.dumps({"instance": instance_dic}).encode(_DEFAULT_ENCODING), timeout=_DEFAULT_TIME_OUT) response.close() def cancel(eureka_server, app_name, instance_id): req = urllib2.Request(_format_url(eureka_server) + "apps/%s/%s" % (app_name, instance_id)) req.get_method = lambda: "DELETE" response = urllib2.urlopen(req, timeout=_DEFAULT_TIME_OUT) response.close() def send_heart_beat(eureka_server, app_name, instance_id, last_dirty_timestamp, status=INSTANCE_STATUS_UP, overriddenstatus=""): url = f"""{_format_url(eureka_server)}apps/{app_name}/{instance_id}?status={status}&lastDirtyTimestamp={str( last_dirty_timestamp)}""" logger.debug("heartbeat url::" + url) if overriddenstatus != "": url += "&overriddenstatus=" + overriddenstatus req = urllib2.Request(url) req.get_method = lambda: "PUT" response = urllib2.urlopen(req, timeout=_DEFAULT_TIME_OUT) response.close() def status_update(eureka_server, app_name, instance_id, last_dirty_timestamp, status): url = _format_url(eureka_server) + "apps/%s/%s?status=%s&lastDirtyTimestamp=%s" % \ (app_name, instance_id, status, str(last_dirty_timestamp)) req = urllib2.Request(url) req.get_method = lambda: "PUT" response = urllib2.urlopen(req, timeout=_DEFAULT_TIME_OUT) response.close() def delete_status_override(eureka_server, app_name, instance_id, last_dirty_timestamp): url = _format_url(eureka_server) + "apps/%s/%s/status?lastDirtyTimestamp=%s" % \ (app_name, instance_id, str(last_dirty_timestamp)) req = urllib2.Request(url) req.get_method = lambda: "DELETE" response = urllib2.urlopen(req, timeout=_DEFAULT_TIME_OUT) response.close() ####### Discovory functions ######## def get_applications(eureka_server, regions=[]): return _get_applications_(_format_url(eureka_server) + "apps/", regions) def _format_url(url): if url.endswith('/'): return url else: return url + "/" def _get_applications_(url, regions=[]): _url = url if len(regions) > 0: _url = _url + ("&" if "?" in _url else "?") + "regions=" + (",".join(regions)) f = urllib2.urlopen(_url, timeout=_DEFAULT_TIME_OUT) txt = f.read().decode(_DEFAULT_ENCODING) f.close() return _build_applications(ElementTree.fromstring(txt)) def _build_applications(xml_node): if xml_node.tag != "applications": return None applications = Applications() for child_node in xml_node.getchildren(): if child_node.tag == "versions__delta" and child_node.text is not None: applications.versions__delta = child_node.text elif child_node.tag == "apps__hashcode" and child_node.text is not None: applications.apps__hashcode = child_node.text elif child_node.tag == "application": applications.add_application(_build_application(child_node)) return applications def _build_application(xml_node): if xml_node.tag != "application": return None application = Application() for child_node in xml_node: if child_node.tag == "name": application.name = child_node.text elif child_node.tag == "instance": application.add_instance(_build_instance(child_node)) return application def _build_instance(xml_node): if xml_node.tag != "instance": return None instance = Instance() for child_node in xml_node: if child_node.tag == "instanceId": instance.instanceId = child_node.text elif child_node.tag == "sid": instance.sid = child_node.text elif child_node.tag == "app": instance.app = child_node.text elif child_node.tag == "appGroupName": instance.appGroupName = child_node.text elif child_node.tag == "ipAddr": instance.ipAddr = child_node.text elif child_node.tag == "port": instance.port = _build_port(child_node) elif child_node.tag == "securePort": instance.securePort = _build_port(child_node) elif child_node.tag == "homePageUrl": instance.homePageUrl = child_node.text elif child_node.tag == "statusPageUrl": instance.statusPageUrl = child_node.text elif child_node.tag == "healthCheckUrl": instance.healthCheckUrl = child_node.text elif child_node.tag == "secureHealthCheckUrl": instance.secureHealthCheckUrl = child_node.text elif child_node.tag == "vipAddress": instance.vipAddress = child_node.text elif child_node.tag == "secureVipAddress": instance.secureVipAddress = child_node.text elif child_node.tag == "countryId": instance.countryId = int(child_node.text) elif child_node.tag == "dataCenterInfo": instance.dataCenterInfo = DataCenterInfo(name=child_node.text, className=child_node.attrib["class"]) elif child_node.tag == "hostName": instance.hostName = child_node.text elif child_node.tag == "status": instance.status = child_node.text elif child_node.tag == "overriddenstatus": instance.overriddenstatus = child_node.text elif child_node.tag == "leaseInfo": instance.leaseInfo = _build_lease_info(child_node) elif child_node.tag == "isCoordinatingDiscoveryServer": instance.isCoordinatingDiscoveryServer = (child_node.text == "true") elif child_node.tag == "metadata": instance.metadata = _build_metadata(child_node) elif child_node.tag == "lastUpdatedTimestamp": instance.lastUpdatedTimestamp = long(child_node.text) elif child_node.tag == "lastDirtyTimestamp": instance.lastDirtyTimestamp = long(child_node.text) elif child_node.tag == "actionType": instance.actionType = child_node.text elif child_node.tag == "asgName": instance.asgName = child_node.text return instance def _build_metadata(xml_node): metadata = {} for child_node in xml_node.getchildren(): metadata[child_node.tag] = child_node.text return metadata def _build_lease_info(xml_node): leaseInfo = LeaseInfo() for child_node in xml_node.getchildren(): if child_node.tag == "renewalIntervalInSecs": leaseInfo.renewalIntervalInSecs = int(child_node.text) elif child_node.tag == "durationInSecs": leaseInfo.durationInSecs = int(child_node.text) elif child_node.tag == "registrationTimestamp": leaseInfo.registrationTimestamp = long(child_node.text) elif child_node.tag == "lastRenewalTimestamp": leaseInfo.lastRenewalTimestamp = long(child_node.text) elif child_node.tag == "renewalTimestamp": leaseInfo.renewalTimestamp = long(child_node.text) elif child_node.tag == "evictionTimestamp": leaseInfo.evictionTimestamp = long(child_node.text) elif child_node.tag == "serviceUpTimestamp": leaseInfo.serviceUpTimestamp = long(child_node.text) return leaseInfo def _build_port(xml_node): port = PortWrapper() port.port = int(xml_node.text) port.enabled = (xml_node.attrib["enabled"] == "true") return port def get_delta(eureka_server, regions=[]): return _get_applications_(_format_url(eureka_server) + "apps/delta", regions) def get_vip(eureka_server, vip, regions=[]): return _get_applications_(_format_url(eureka_server) + "vips/" + vip, regions) def get_secure_vip(eureka_server, svip, regions=[]): return _get_applications_(_format_url(eureka_server) + "svips/" + svip, regions) def get_application(eureka_server, app_name): url = _format_url(eureka_server) + "apps/" + app_name f = urllib2.urlopen(url, timeout=_DEFAULT_TIME_OUT) txt = f.read().decode(_DEFAULT_ENCODING) f.close() return _build_application(ElementTree.fromstring(txt)) def get_app_instance(eureka_server, app_name, instance_id): return _get_instance_(_format_url(eureka_server) + "apps/%s/%s" % (app_name, instance_id)) def get_instance(eureka_server, instance_id): return _get_instance_(_format_url(eureka_server) + "instances/" + instance_id) def _get_instance_(url): f = urllib2.urlopen(url, timeout=_DEFAULT_TIME_OUT) txt = f.read().decode(_DEFAULT_ENCODING) f.close() return _build_instance(ElementTree.fromstring(txt)) def _current_time_millis(): return int(time.time() * 1000) """====================== Registry Client =======================================""" class RegistryClient: """Eureka client for spring cloud""" def __init__(self, eureka_server=_DEFAULT_EUREKA_SERVER_URL, app_name="", instance_id="", instance_host="", instance_ip="", instance_port=_DEFAULT_INSTNACE_PORT, instance_unsecure_port_enabled=True, instance_secure_port=_DEFAULT_INSTNACE_SECURE_PORT, instance_secure_port_enabled=False, countryId=1, # @deprecaded data_center_name=_DEFAULT_DATA_CENTER_INFO, # Netflix, Amazon, MyOwn renewal_interval_in_secs=_RENEWAL_INTERVAL_IN_SECS, duration_in_secs=_DURATION_IN_SECS, home_page_url="", status_page_url="", health_check_url="", vip_adr="", secure_vip_addr="", is_coordinating_discovery_server=False): assert eureka_server is not None and eureka_server != "", "eureka server must be specified." assert app_name is not None and app_name != "", "application name must be specified." assert instance_port > 0, "port is unvalid" self.__net_lock = Lock() self.__eureka_servers = eureka_server.split(",") def try_to_get_client_ip(url): url_addr = urllib2.get_url_and_basic_auth(url)[0] if instance_host == "" and instance_ip == "": self.__instance_host = self.__instance_ip = RegistryClient.__get_instance_ip(url_addr) elif instance_host != "" and instance_ip == "": self.__instance_host = instance_host if RegistryClient.__is_ip(instance_host): self.__instance_ip = instance_host else: self.__instance_ip = RegistryClient.__get_instance_ip(url_addr) else: self.__instance_host = instance_ip self.__instance_ip = instance_ip self.__try_all_eureka_server(try_to_get_client_ip) self.__instance = { 'instanceId': instance_id if instance_id != "" else "%s:%s:%d" % ( self.__instance_host, app_name.lower(), instance_port), 'hostName': self.__instance_host, 'app': app_name.upper(), 'ipAddr': self.__instance_ip, 'port': { '$': instance_port, '@enabled': str(instance_unsecure_port_enabled).lower() }, 'securePort': { '$': instance_secure_port, '@enabled': str(instance_secure_port_enabled).lower() }, 'countryId': countryId, 'dataCenterInfo': { '@class': _DEFAULT_DATA_CENTER_INFO_CLASS, 'name': data_center_name }, 'leaseInfo': { 'renewalIntervalInSecs': renewal_interval_in_secs, 'durationInSecs': duration_in_secs, 'registrationTimestamp': 0, 'lastRenewalTimestamp': 0, 'evictionTimestamp': 0, 'serviceUpTimestamp': 0 }, 'metadata': { 'management.port': str(instance_port) }, 'homePageUrl': RegistryClient.__format_url(home_page_url, self.__instance_host, instance_port), 'statusPageUrl': RegistryClient.__format_url(status_page_url, self.__instance_host, instance_port, "info"), 'healthCheckUrl': RegistryClient.__format_url(health_check_url, self.__instance_host, instance_port, "health"), 'vipAddress': vip_adr if vip_adr != "" else app_name.lower(), 'secureVipAddress': secure_vip_addr if secure_vip_addr != "" else app_name.lower(), 'isCoordinatingDiscoveryServer': str(is_coordinating_discovery_server).lower() } self.__alive = False self.__heart_beat_timer = Timer(renewal_interval_in_secs, self.__heart_beat) self.__heart_beat_timer.daemon = True def __try_all_eureka_server(self, fun): with self.__net_lock: untry_servers = self.__eureka_servers tried_servers = [] ok = False while len(untry_servers) > 0: url = untry_servers[0].strip() try: fun(url) except (urllib2.HTTPError, urllib2.URLError) as e: logger.warning(f"Eureka server [{url}] is down, use next url to try.({e})" ) tried_servers.append(url) untry_servers = untry_servers[1:] else: ok = True break if len(tried_servers) > 0: untry_servers.extend(tried_servers) self.__eureka_servers = untry_servers if not ok: raise urllib2.URLError("All eureka servers are down!") @staticmethod def __format_url(url, host, port, defalut_ctx=""): if url != "": if url.startswith('http'): _url = url elif url.startswith('/'): _url = 'http://%s:%d%s' % (host, port, url) else: _url = 'http://%s:%d/%s' % (host, port, url) else: _url = 'http://%s:%d/%s' % (host, port, defalut_ctx) return _url @staticmethod def __is_ip(ip_str): return re.match(r'^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$', ip_str) @staticmethod def __get_instance_ip(eureka_server): _target_ = eureka_server if not _target_.endswith('/'): _target_ += '/' url_obj = urlparse(_target_) _target_ = url_obj.netloc logger.debug("target eureka host::: %s" % _target_) if _target_.find(':') > 0: arr = _target_.split(':') target_ip = arr[0] target_port = int(arr[1]) else: target_ip = _target_ if url_obj.scheme == "http": target_port = 80 elif url_obj.scheme == "https": target_port = 443 else: raise Exception("Cannot parse your eureka url! ") s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.connect((target_ip, target_port)) ip = s.getsockname()[0] s.close() return ip def register(self, status=INSTANCE_STATUS_UP, overriddenstatus=INSTANCE_STATUS_UNKNOWN): self.__instance["status"] = status self.__instance["overriddenstatus"] = overriddenstatus self.__instance["lastUpdatedTimestamp"] = str(_current_time_millis()) self.__instance["lastDirtyTimestamp"] = str(_current_time_millis()) try: self.__try_all_eureka_server(lambda url: _register(url, self.__instance)) except: logger.error("error!") else: self.__alive = True def cancel(self): try: self.__try_all_eureka_server(lambda url: cancel(url, self.__instance["app"], self.__instance["instanceId"])) except: logger.error("error!") else: self.__alive = False def send_heart_beat(self, overridden_status=""): try: self.__try_all_eureka_server(lambda url: send_heart_beat(url, self.__instance["app"], self.__instance["instanceId"], self.__instance["lastDirtyTimestamp"], status=self.__instance["status"], overriddenstatus=overridden_status)) except Exception as e: logger.error(f"error!{e.__dict__}") # lost heartbeat info try: self.register() except: # try reg self failed,close this server self.stop() def status_update(self, new_status): self.__instance["status"] = new_status try: self.__try_all_eureka_server( lambda url: status_update(url, self.__instance["app"], self.__instance["instanceId"], self.__instance["lastDirtyTimestamp"], new_status)) except: logger.error("error!") def delete_status_override(self): self.__try_all_eureka_server(lambda url: delete_status_override( url, self.__instance["app"], self.__instance["instanceId"], self.__instance["lastDirtyTimestamp"])) def start(self): logger.debug("start to registry client...") self.register() self.__heart_beat_timer.daemon = True self.__heart_beat_timer.start() def stop(self): if self.__alive: logger.debug("stopping client...") if self.__heart_beat_timer.isAlive(): self.__heart_beat_timer.cancel() self.register(status=INSTANCE_STATUS_DOWN) self.cancel() def __heart_beat(self): logger.debug("sending heart beat to spring cloud server ") self.send_heart_beat() self.__heart_beat_timer = Timer(self.__instance["leaseInfo"]["renewalIntervalInSecs"], self.__heart_beat) self.__heart_beat_timer.daemon = True self.__heart_beat_timer.start() __cache_key = "default" __cache_registry_clients = {} __cache_registry_clients_lock = Lock() def init_registry_client(eureka_server=_DEFAULT_EUREKA_SERVER_URL, app_name="", instance_id="", instance_host="", instance_ip="", instance_port=_DEFAULT_INSTNACE_PORT, instance_unsecure_port_enabled=True, instance_secure_port=_DEFAULT_INSTNACE_SECURE_PORT, instance_secure_port_enabled=False, countryId=1, # @deprecaded data_center_name=_DEFAULT_DATA_CENTER_INFO, # Netflix, Amazon, MyOwn renewal_interval_in_secs=_RENEWAL_INTERVAL_IN_SECS, duration_in_secs=_DURATION_IN_SECS, home_page_url="", status_page_url="", health_check_url="", vip_adr="", secure_vip_addr="", is_coordinating_discovery_server=False): with __cache_registry_clients_lock: client = RegistryClient(eureka_server=eureka_server, app_name=app_name, instance_id=instance_id, instance_host=instance_host, instance_ip=instance_ip, instance_port=instance_port, instance_unsecure_port_enabled=instance_unsecure_port_enabled, instance_secure_port=instance_secure_port, instance_secure_port_enabled=instance_secure_port_enabled, countryId=countryId, data_center_name=data_center_name, renewal_interval_in_secs=renewal_interval_in_secs, duration_in_secs=duration_in_secs, home_page_url=home_page_url, status_page_url=status_page_url, health_check_url=health_check_url, vip_adr=vip_adr, secure_vip_addr=secure_vip_addr, is_coordinating_discovery_server=is_coordinating_discovery_server) __cache_registry_clients[__cache_key] = client client.start() return client def get_registry_client(): # type (str) -> RegistryClient with __cache_registry_clients_lock: if __cache_key in __cache_registry_clients: return __cache_registry_clients[__cache_key] else: return None """======================== Cached Discovery Client ============================""" class DiscoveryClient: """Discover the apps registered in spring cloud server, this class will do some cached, if you want to get the apps immediatly, use the global functions""" def __init__(self, eureka_server, regions=None, renewal_interval_in_secs=_RENEWAL_INTERVAL_IN_SECS, ha_strategy=HA_STRATEGY_RANDOM): assert ha_strategy in [HA_STRATEGY_RANDOM, HA_STRATEGY_STICK, HA_STRATEGY_OTHER], "do not support strategy %d " % ha_strategy self.__eureka_servers = eureka_server.split(",") self.__regions = regions if regions is not None else [] self.__cache_time_in_secs = renewal_interval_in_secs self.__applications = None self.__delta = None self.__ha_strategy = ha_strategy self.__ha_cache = {} self.__timer = Timer(self.__cache_time_in_secs, self.__heartbeat) self.__timer.daemon = True self.__application_mth_lock = Lock() self.__net_lock = Lock() def __heartbeat(self): self.__fetch_delta() self.__timer = Timer(self.__cache_time_in_secs, self.__heartbeat) self.__timer.daemon = True self.__timer.start() @property def applications(self): with self.__application_mth_lock: if self.__applications is None: self.__pull_full_registry() return self.__applications def __try_all_eureka_server(self, fun): with self.__net_lock: untry_servers = self.__eureka_servers tried_servers = [] ok = False while len(untry_servers) > 0: url = untry_servers[0].strip() try: fun(url) except (urllib2.HTTPError, urllib2.URLError): logger.warning("Eureka server [%s] is down, use next url to try." % url) tried_servers.append(url) untry_servers = untry_servers[1:] else: ok = True break if len(tried_servers) > 0: untry_servers.extend(tried_servers) self.__eureka_servers = untry_servers if not ok: raise urllib2.URLError("All eureka servers are down!") def __pull_full_registry(self): def do_pull(url): # the actual function body self.__applications = get_applications(url, self.__regions) self.__delta = self.__applications self.__try_all_eureka_server(do_pull) def __fetch_delta(self): def do_fetch(url): if self.__applications is None or len(self.__applications.applications) == 0: self.__pull_full_registry() return delta = get_delta(url, self.__regions) logger.debug("delta got: v.%s::%s" % (delta.versionsDelta, delta.appsHashcode)) if self.__delta is not None \ and delta.versionsDelta == self.__delta.versionsDelta \ and delta.appsHashcode == self.__delta.appsHashcode: return self.__merge_delta(delta) self.__delta = delta if not self.__is_hash_match(): self.__pull_full_registry() self.__try_all_eureka_server(do_fetch) def __is_hash_match(self): app_hash = self.__get_applications_hash() logger.debug("check hash, local[%s], remote[%s]" % (app_hash, self.__delta.appsHashcode)) return app_hash == self.__delta.appsHashcode def __merge_delta(self, delta): logger.debug("merge delta...length of application got from delta::%d" % len(delta.applications)) for application in delta.applications: for instance in application.instances: logger.debug("instance [%s] has %s" % (instance.instanceId, instance.actionType)) if instance.actionType in (ACTION_TYPE_ADDED, ACTION_TYPE_MODIFIED): existingApp = self.applications.get_application(application.name) if existingApp is None: self.applications.add_application(application) else: existingApp.update_instance(instance) elif instance.actionType == ACTION_TYPE_DELETED: existingApp = self.applications.get_application(application.name) if existingApp is None: self.applications.add_application(application) existingApp.remove_instance(instance) def __get_applications_hash(self): app_hash = "" app_status_count = {} for application in self.__applications.applications: for instance in application.instances: if instance.status not in app_status_count: app_status_count[instance.status.upper()] = 0 app_status_count[instance.status.upper()] = app_status_count[instance.status.upper()] + 1 sorted_app_status_count = sorted(app_status_count.items(), key=lambda item: item[0]) for item in sorted_app_status_count: app_hash = app_hash + "%s_%d_" % (item[0], item[1]) return app_hash def walk_nodes_async(self, app_name="", service="", prefer_ip=False, prefer_https=False, walker=None, on_success=None, on_error=None): def async_thread_target(): try: res = self.walk_nodes(app_name=app_name, service=service, prefer_ip=prefer_ip, prefer_https=prefer_https, walker=walker) if on_success is not None and (inspect.isfunction(on_success) or inspect.ismethod(on_success)): on_success(res) except urllib2.HTTPError as e: if on_error is not None and (inspect.isfunction(on_error) or inspect.ismethod(on_error)): on_error(e) async_thread = Thread(target=async_thread_target) async_thread.daemon = True async_thread.start() def walk_nodes(self, app_name="", service="", prefer_ip=False, prefer_https=False, walker=None): assert app_name is not None and app_name != "", "application_name should not be null" assert inspect.isfunction(walker) or inspect.ismethod(walker), "walker must be a method or function" error_nodes = [] app_name = app_name.upper() node = self.__get_availabe_service(app_name) while node is not None: try: url = self.__generate_service_url(node, prefer_ip, prefer_https) if service.startswith("/"): url = url + service[1:] else: url = url + service logger.debug("service url::" + url) return walker(url) except (urllib2.HTTPError, urllib2.URLError): logger.warning("do service %s in node [%s] error, use next node." % (service, node.instanceId)) error_nodes.append(node.instanceId) node = self.__get_availabe_service(app_name, error_nodes) raise urllib2.HTTPError("Try all up instances in registry, but all fail") def do_service_async(self, app_name="", service="", return_type="string", prefer_ip=False, prefer_https=False, on_success=None, on_error=None, method="GET", headers=None, data=None, timeout=_DEFAULT_TIME_OUT, cafile=None, capath=None, cadefault=False, context=None): def async_thread_target(): try: res = self.do_service(app_name=app_name, service=service, return_type=return_type, prefer_ip=prefer_ip, prefer_https=prefer_https, method=method, headers=headers, data=data, timeout=timeout, cafile=cafile, capath=capath, cadefault=cadefault, context=context) if on_success is not None and (inspect.isfunction(on_success) or inspect.ismethod(on_success)): on_success(res) except urllib2.HTTPError as e: if on_error is not None and (inspect.isfunction(on_error) or inspect.ismethod(on_error)): on_error(e) async_thread = Thread(target=async_thread_target) async_thread.daemon = True async_thread.start() def do_service(self, app_name="", service="", return_type="string", prefer_ip=False, prefer_https=False, method="GET", headers=None, data=None, timeout=_DEFAULT_TIME_OUT, cafile=None, capath=None, cadefault=False, context=None): def walk_using_urllib(url): req = urllib2.Request(url) req.get_method = lambda: method heads = headers if headers is not None else {} for k, v in heads.items(): req.add_header(k, v) response = urllib2.urlopen(req, data=data, timeout=timeout, cafile=cafile, capath=capath, cadefault=cadefault, context=context) res_txt = response.read().decode(_DEFAULT_ENCODING) response.close() if return_type.lower() in ("json", "dict", "dictionary"): return json.loads(res_txt) else: return res_txt return self.walk_nodes(app_name, service, prefer_ip, prefer_https, walk_using_urllib) def __get_availabe_service(self, application_name, ignore_instance_ids=None): app = self.applications.get_application(application_name) if app is None: return None up_instances = [] if ignore_instance_ids is None or len(ignore_instance_ids) == 0: up_instances.extend(app.up_instances) else: for ins in app.up_instances: if ins.instanceId not in ignore_instance_ids: up_instances.append(ins) if len(up_instances) == 0: # no up instances return None elif len(up_instances) == 1: # only one available instance, then doesn't matter which strategy is. instance = up_instances[0] self.__ha_cache[application_name] = instance.instanceId return instance def random_one(instances): if len(instances) == 1: idx = 0 else: idx = random.randint(0, len(instances) - 1) selected_instance = instances[idx] self.__ha_cache[application_name] = selected_instance.instanceId return selected_instance if self.__ha_strategy == HA_STRATEGY_RANDOM: return random_one(up_instances) elif self.__ha_strategy == HA_STRATEGY_STICK: if application_name in self.__ha_cache: cache_id = self.__ha_cache[application_name] cahce_instance = app.get_instance(cache_id) if cahce_instance is not None and cahce_instance.status == INSTANCE_STATUS_UP: return cahce_instance else: return random_one(up_instances) else: return random_one(up_instances) elif self.__ha_strategy == HA_STRATEGY_OTHER: if application_name in self.__ha_cache: cache_id = self.__ha_cache[application_name] other_instances = [] for up_instance in up_instances: if up_instance.instanceId != cache_id: other_instances.append(up_instance) return random_one(other_instances) else: return random_one(up_instances) else: return None def __generate_service_url(self, instance, prefer_ip, prefer_https): if instance is None: return None schema = "http" port = 0 if instance.port.port and not instance.securePort.enabled: schema = "http" port = instance.port.port elif not instance.port.port and instance.securePort.enabled: schema = "https" port = instance.securePort.port elif instance.port.port and instance.securePort.enabled: if prefer_https: schema = "https" port = instance.securePort.port else: schema = "http" port = instance.port.port else: assert False, "generate_service_url error: No port is available" host = instance.ipAddr if prefer_ip else instance.hostName return "%s://%s:%d/" % (schema, host, port) def start(self): self.__pull_full_registry() self.__timer.start() def stop(self): if self.__timer.isAlive(): self.__timer.cancel() __cache_discovery_clients = {} __cache_discovery_clients_lock = Lock() def init_discovery_client(eureka_server=_DEFAULT_EUREKA_SERVER_URL, regions=[], renewal_interval_in_secs=_RENEWAL_INTERVAL_IN_SECS, ha_strategy=HA_STRATEGY_RANDOM): with __cache_discovery_clients_lock: assert __cache_key not in __cache_discovery_clients, "Client has already been initialized." cli = DiscoveryClient(eureka_server, regions=regions, renewal_interval_in_secs=renewal_interval_in_secs, ha_strategy=ha_strategy) cli.start() __cache_discovery_clients[__cache_key] = cli return cli def get_discovery_client(): # type: (str) -> DiscoveryClient with __cache_discovery_clients_lock: if __cache_key in __cache_discovery_clients: return __cache_discovery_clients[__cache_key] else: return None def init(eureka_server=_DEFAULT_EUREKA_SERVER_URL, regions=[], app_name="", instance_id="", instance_host="", instance_ip="", instance_port=_DEFAULT_INSTNACE_PORT, instance_unsecure_port_enabled=True, instance_secure_port=_DEFAULT_INSTNACE_SECURE_PORT, instance_secure_port_enabled=False, countryId=1, # @deprecaded data_center_name=_DEFAULT_DATA_CENTER_INFO, # Netflix, Amazon, MyOwn renewal_interval_in_secs=_RENEWAL_INTERVAL_IN_SECS, duration_in_secs=_DURATION_IN_SECS, home_page_url="", status_page_url="", health_check_url="", vip_adr="", secure_vip_addr="", is_coordinating_discovery_server=False, ha_strategy=HA_STRATEGY_RANDOM): registry_client = init_registry_client(eureka_server=eureka_server, app_name=app_name, instance_id=instance_id, instance_host=instance_host, instance_ip=instance_ip, instance_port=instance_port, instance_unsecure_port_enabled=instance_unsecure_port_enabled, instance_secure_port=instance_secure_port, instance_secure_port_enabled=instance_secure_port_enabled, countryId=countryId, data_center_name=data_center_name, renewal_interval_in_secs=renewal_interval_in_secs, duration_in_secs=duration_in_secs, home_page_url=home_page_url, status_page_url=status_page_url, health_check_url=health_check_url, vip_adr=vip_adr, secure_vip_addr=secure_vip_addr, is_coordinating_discovery_server=is_coordinating_discovery_server) discovery_client = init_discovery_client(eureka_server, regions=regions, renewal_interval_in_secs=renewal_interval_in_secs, ha_strategy=ha_strategy) return registry_client, discovery_client def walk_nodes_async(app_name="", service="", prefer_ip=False, prefer_https=False, walker=None, on_success=None, on_error=None): cli = get_discovery_client() if cli is None: raise Exception("Discovery Client has not initialized. ") cli.walk_nodes_async(app_name=app_name, service=service, prefer_ip=prefer_ip, prefer_https=prefer_https, walker=walker, on_success=on_success, on_error=on_error) def walk_nodes(app_name="", service="", prefer_ip=False, prefer_https=False, walker=None): cli = get_discovery_client() if cli is None: raise Exception("Discovery Client has not initialized. ") return cli.walk_nodes(app_name=app_name, service=service, prefer_ip=prefer_ip, prefer_https=prefer_https, walker=walker) def do_service_async(app_name="", service="", return_type="string", prefer_ip=False, prefer_https=False, on_success=None, on_error=None, method="GET", headers=None, data=None, timeout=_DEFAULT_TIME_OUT, cafile=None, capath=None, cadefault=False, context=None): cli = get_discovery_client() if cli is None: raise Exception("Discovery Client has not initialized. ") cli.do_service_async(app_name=app_name, service=service, return_type=return_type, prefer_ip=prefer_ip, prefer_https=prefer_https, on_success=on_success, on_error=on_error, method=method, headers=headers, data=data, timeout=timeout, cafile=cafile, capath=capath, cadefault=cadefault, context=context) def do_service(app_name="", service="", return_type="string", prefer_ip=False, prefer_https=False, method="GET", headers=None, data=None, timeout=_DEFAULT_TIME_OUT, cafile=None, capath=None, cadefault=False, context=None): cli = get_discovery_client() if cli is None: raise Exception("Discovery Client has not initialized. ") return cli.do_service(app_name=app_name, service=service, return_type=return_type, prefer_ip=prefer_ip, prefer_https=prefer_https, method=method, headers=headers, data=data, timeout=timeout, cafile=cafile, capath=capath, cadefault=cadefault, context=context) def stop(): register_cli = get_registry_client() if register_cli is not None: register_cli.stop() discovery_client = get_discovery_client() if discovery_client is not None: discovery_client.stop() @atexit.register def _cleanup_before_exist(): if len(__cache_registry_clients) > 0: logger.debug("cleaning up registry clients") for k, cli in __cache_registry_clients.items(): logger.debug( "try to stop cache registry client [%s] this will also unregister this client from the eureka server" % k) cli.stop() if len(__cache_discovery_clients) > 0: logger.debug("cleaning up discovery clients") for k, cli in __cache_discovery_clients.items(): logger.debug( "try to stop cache discovery client [%s] this will also unregister this client from the eureka server" % k) cli.stop() PK!^n Base Mehods <======================================== ### ### Beans ### class Applications: def __init__(self, apps__hashcode="", versions__delta="", applications=None): self.apps__hashcode = apps__hashcode self.versions__delta = versions__delta self.__applications = applications if applications is not None else [] self.__application_name_dic = {} self.__app_lock = Lock() @property def appsHashcode(self): return self.apps__hashcode @property def applications(self): return self.__applications @property def versionsDelta(self): return self.versions__delta def add_application(self, application): with self.__app_lock: self.__applications.append(application) self.__application_name_dic[application.name] = application def get_application(self, app_name): with self.__app_lock: if app_name in self.__application_name_dic: return self.__application_name_dic[app_name] else: return Application(name=app_name) class Application: def __init__(self, name="", instances=None): self.name = name self.__instances = instances if instances is not None else [] self.__instances_dict = {} self.__inst_lock = Lock() @property def instances(self): with self.__inst_lock: return self.__instances @property def up_instances(self): with self.__inst_lock: up_inst = [] for item in self.__instances: if item.status == INSTANCE_STATUS_UP: up_inst.append(item) return up_inst def get_instance(self, instance_id): with self.__inst_lock: if instance_id in self.__instances_dict: return self.__instances_dict[instance_id] else: return None def add_instance(self, instance): with self.__inst_lock: self.__instances.append(instance) self.__instances_dict[instance.instanceId] = instance def update_instance(self, instance): with self.__inst_lock: logger.debug("update instance %s" % instance.instanceId) updated = False for idx in range(len(self.__instances)): ele = self.__instances[idx] if ele.instanceId == instance.instanceId: logger.debug("updating index %d" % idx) self.__instances[idx] = instance updated = True break if not updated: self.add_instance(instance) def remove_instance(self, instance): with self.__inst_lock: for idx in range(len(self.__instances)): ele = self.__instances[idx] if ele.instanceId == instance.instanceId: del self.__instances[idx] break if instance.instanceId in self.__instances_dict: del self.__instances_dict[instance.instanceId] class LeaseInfo: def __init__(self, renewalIntervalInSecs=_RENEWAL_INTERVAL_IN_SECS, durationInSecs=_DURATION_IN_SECS, registrationTimestamp=0, lastRenewalTimestamp=0, renewalTimestamp=0, evictionTimestamp=0, serviceUpTimestamp=0): self.renewalIntervalInSecs = renewalIntervalInSecs self.durationInSecs = durationInSecs self.registrationTimestamp = registrationTimestamp self.lastRenewalTimestamp = lastRenewalTimestamp self.renewalTimestamp = renewalTimestamp self.evictionTimestamp = evictionTimestamp self.serviceUpTimestamp = serviceUpTimestamp class DataCenterInfo: def __init__(self, name=_DEFAULT_DATA_CENTER_INFO, # Netflix, Amazon, MyOwn className=_DEFAULT_DATA_CENTER_INFO_CLASS): self.name = name self.className = className class PortWrapper: def __init__(self, port=0, enabled=False): self.port = port self.enabled = enabled class Instance: def __init__(self, instanceId="", sid="", # @deprecated app="", appGroupName="", ipAddr="", port=PortWrapper(port=_DEFAULT_INSTNACE_PORT, enabled=True), securePort=PortWrapper(port=_DEFAULT_INSTNACE_SECURE_PORT, enabled=False), homePageUrl="", statusPageUrl="", healthCheckUrl="", secureHealthCheckUrl="", vipAddress="", secureVipAddress="", countryId=1, dataCenterInfo=DataCenterInfo(), hostName="", status="", # UP, DOWN, STARTING, OUT_OF_SERVICE, UNKNOWN overriddenstatus="", # UP, DOWN, STARTING, OUT_OF_SERVICE, UNKNOWN leaseInfo=LeaseInfo(), isCoordinatingDiscoveryServer=False, metadata=None, lastUpdatedTimestamp=0, lastDirtyTimestamp=0, actionType=ACTION_TYPE_ADDED, # ADDED, MODIFIED, DELETED asgName=""): self.instanceId = instanceId self.sid = sid self.app = app self.appGroupName = appGroupName self.ipAddr = ipAddr self.port = port self.securePort = securePort self.homePageUrl = homePageUrl self.statusPageUrl = statusPageUrl self.healthCheckUrl = healthCheckUrl self.secureHealthCheckUrl = secureHealthCheckUrl self.vipAddress = vipAddress self.secureVipAddress = secureVipAddress self.countryId = countryId self.dataCenterInfo = dataCenterInfo self.hostName = hostName self.status = status self.overriddenstatus = overriddenstatus self.leaseInfo = leaseInfo self.isCoordinatingDiscoveryServer = isCoordinatingDiscoveryServer self.metadata = metadata if metadata is not None else {} self.lastUpdatedTimestamp = lastUpdatedTimestamp self.lastDirtyTimestamp = lastDirtyTimestamp self.actionType = actionType self.asgName = asgName ########################## Basic functions ################################# ####### Registry functions ######### def register(eureka_server, instance): instance_dic = { 'instanceId': instance.instanceId, 'hostName': instance.hostName, 'app': instance.app, 'ipAddr': instance.ipAddr, 'status': instance.status, 'overriddenstatus': instance.overriddenstatus, 'port': { '$': instance.port.port, '@enabled': str(instance.port.enabled).lower() }, 'securePort': { '$': instance.securePort.port, '@enabled': str(instance.securePort.enabled).lower() }, 'countryId': instance.countryId, 'dataCenterInfo': { '@class': instance.dataCenterInfo.className, 'name': instance.dataCenterInfo.name }, 'leaseInfo': { 'renewalIntervalInSecs': instance.leaseInfo.renewalIntervalInSecs, 'durationInSecs': instance.leaseInfo.durationInSecs, 'registrationTimestamp': instance.leaseInfo.registrationTimestamp, 'lastRenewalTimestamp': instance.leaseInfo.lastRenewalTimestamp, 'evictionTimestamp': instance.leaseInfo.evictionTimestamp, 'serviceUpTimestamp': instance.leaseInfo.serviceUpTimestamp }, 'metadata': instance.metadata, 'homePageUrl': instance.homePageUrl, 'statusPageUrl': instance.statusPageUrl, 'healthCheckUrl': instance.healthCheckUrl, 'vipAddress': instance.vipAddress, 'secureVipAddress': instance.secureVipAddress, 'lastUpdatedTimestamp': str(instance.lastUpdatedTimestamp), 'lastDirtyTimestamp': str(instance.lastDirtyTimestamp), 'isCoordinatingDiscoveryServer': str(instance.isCoordinatingDiscoveryServer).lower() } _register(eureka_server, instance_dic) def _register(eureka_server, instance_dic): url = f'{_format_url(eureka_server)}apps/v2/{instance_dic["app"]}/{instance_dic["instanceId"]}' res = pool.request( 'POST', url, body=json.dumps({"instance": instance_dic}).encode(_DEFAULT_ENCODING), headers={'Content-Type': 'application/json'}, timeout=_DEFAULT_TIME_OUT ) # print(res.data) # req = _urllib2.Request(url) # req.add_header('Content-Type', 'application/json') # req.get_method = lambda: "POST" # response = _urllib2.urlopen(req, json.dumps({"instance": instance_dic}).encode(_DEFAULT_ENCODING), # timeout=_DEFAULT_TIME_OUT) # response.close() def cancel(eureka_server, app_name, instance_id): url = f"""{_format_url(eureka_server)}apps/{app_name}/{instance_id}""" res = pool.request('DELETE', url, timeout=_DEFAULT_TIME_OUT) # print(res.data) # req = _urllib2.Request(url) # req.get_method = lambda: "DELETE" # response = _urllib2.urlopen(req, timeout=_DEFAULT_TIME_OUT) # response.close() def send_heart_beat(eureka_server, app_name, instance_id, last_dirty_timestamp, status=INSTANCE_STATUS_UP, overriddenstatus=""): url = f"""{_format_url(eureka_server)}apps/{app_name}/{instance_id}?status={status}&lastDirtyTimestamp={str( last_dirty_timestamp)}""" # url = _format_url(eureka_server) + "apps/%s/%s?status=%s&lastDirtyTimestamp=%s" % \ # (app_name, instance_id, status, str(last_dirty_timestamp)) logger.debug("heartbeat url::" + url) if overriddenstatus != "": url += "&overriddenstatus=" + overriddenstatus res = pool.request('PUT', url, timeout=_DEFAULT_TIME_OUT) # print(res.data) # req = _urllib2.Request(url) # req.get_method = lambda: "PUT" # response = _urllib2.urlopen(req, timeout=_DEFAULT_TIME_OUT) # response.close() def status_update(eureka_server, app_name, instance_id, last_dirty_timestamp, status): url = f"""{_format_url(eureka_server)}apps/{app_name}/{instance_id}?status={status}&lastDirtyTimestamp={str( last_dirty_timestamp)}""" res = pool.request('PUT', url, timeout=_DEFAULT_TIME_OUT) # print(res.data) # url = _format_url(eureka_server) + "apps/%s/%s?status=%s&lastDirtyTimestamp=%s" % \ # (app_name, instance_id, status, str(last_dirty_timestamp)) # # req = _urllib2.Request(url) # req.get_method = lambda: "PUT" # response = _urllib2.urlopen(req, timeout=_DEFAULT_TIME_OUT) # response.close() def delete_status_override(eureka_server, app_name, instance_id, last_dirty_timestamp): url = f"""{_format_url(eureka_server)}apps/{app_name}/{instance_id}/status?lastDirtyTimestamp={str( last_dirty_timestamp)}""" res = pool.request('DELETE', url, timeout=_DEFAULT_TIME_OUT) # print(res.data) # url = _format_url(eureka_server) + "apps/%s/%s/status?lastDirtyTimestamp=%s" % \ # (app_name, instance_id, str(last_dirty_timestamp)) # # req = _urllib2.Request(url) # req.get_method = lambda: "DELETE" # response = _urllib2.urlopen(req, timeout=_DEFAULT_TIME_OUT) # response.close() ####### Discovory functions ######## def get_applications(eureka_server, regions=[]): return _get_applications_(_format_url(eureka_server) + "apps/", regions) def _format_url(url): if url.endswith('/'): return url else: return url + "/" def _get_applications_(url, regions=[]): _url = url if len(regions) > 0: _url = _url + ("&" if "?" in _url else "?") + "regions=" + (",".join(regions)) res = pool.request( method='GET', url=_url, timeout=_DEFAULT_TIME_OUT ) # print(res.data) txt = str(res.data, encoding=_DEFAULT_ENCODING) # f = _urllib2.urlopen(_url, timeout=_DEFAULT_TIME_OUT) # txt = f.read().decode(_DEFAULT_ENCODING) # f.close() return _build_applications(ElementTree.fromstring(txt)) def _build_applications(xml_node): if xml_node.tag != "applications": return None applications = Applications() for child_node in xml_node.getchildren(): if child_node.tag == "versions__delta" and child_node.text is not None: applications.versions__delta = child_node.text elif child_node.tag == "apps__hashcode" and child_node.text is not None: applications.apps__hashcode = child_node.text elif child_node.tag == "application": applications.add_application(_build_application(child_node)) return applications def _build_application(xml_node): if xml_node.tag != "application": return None application = Application() for child_node in xml_node: if child_node.tag == "name": application.name = child_node.text elif child_node.tag == "instance": application.add_instance(_build_instance(child_node)) return application def _build_instance(xml_node): if xml_node.tag != "instance": return None instance = Instance() for child_node in xml_node: if child_node.tag == "instanceId": instance.instanceId = child_node.text elif child_node.tag == "sid": instance.sid = child_node.text elif child_node.tag == "app": instance.app = child_node.text elif child_node.tag == "appGroupName": instance.appGroupName = child_node.text elif child_node.tag == "ipAddr": instance.ipAddr = child_node.text elif child_node.tag == "port": instance.port = _build_port(child_node) elif child_node.tag == "securePort": instance.securePort = _build_port(child_node) elif child_node.tag == "homePageUrl": instance.homePageUrl = child_node.text elif child_node.tag == "statusPageUrl": instance.statusPageUrl = child_node.text elif child_node.tag == "healthCheckUrl": instance.healthCheckUrl = child_node.text elif child_node.tag == "secureHealthCheckUrl": instance.secureHealthCheckUrl = child_node.text elif child_node.tag == "vipAddress": instance.vipAddress = child_node.text elif child_node.tag == "secureVipAddress": instance.secureVipAddress = child_node.text elif child_node.tag == "countryId": instance.countryId = int(child_node.text) elif child_node.tag == "dataCenterInfo": instance.dataCenterInfo = DataCenterInfo(name=child_node.text, className=child_node.attrib["class"]) elif child_node.tag == "hostName": instance.hostName = child_node.text elif child_node.tag == "status": instance.status = child_node.text elif child_node.tag == "overriddenstatus": instance.overriddenstatus = child_node.text elif child_node.tag == "leaseInfo": instance.leaseInfo = _build_lease_info(child_node) elif child_node.tag == "isCoordinatingDiscoveryServer": instance.isCoordinatingDiscoveryServer = (child_node.text == "true") elif child_node.tag == "metadata": instance.metadata = _build_metadata(child_node) elif child_node.tag == "lastUpdatedTimestamp": instance.lastUpdatedTimestamp = long(child_node.text) elif child_node.tag == "lastDirtyTimestamp": instance.lastDirtyTimestamp = long(child_node.text) elif child_node.tag == "actionType": instance.actionType = child_node.text elif child_node.tag == "asgName": instance.asgName = child_node.text return instance def _build_metadata(xml_node): metadata = {} for child_node in xml_node.getchildren(): metadata[child_node.tag] = child_node.text return metadata def _build_lease_info(xml_node): leaseInfo = LeaseInfo() for child_node in xml_node.getchildren(): if child_node.tag == "renewalIntervalInSecs": leaseInfo.renewalIntervalInSecs = int(child_node.text) elif child_node.tag == "durationInSecs": leaseInfo.durationInSecs = int(child_node.text) elif child_node.tag == "registrationTimestamp": leaseInfo.registrationTimestamp = long(child_node.text) elif child_node.tag == "lastRenewalTimestamp": leaseInfo.lastRenewalTimestamp = long(child_node.text) elif child_node.tag == "renewalTimestamp": leaseInfo.renewalTimestamp = long(child_node.text) elif child_node.tag == "evictionTimestamp": leaseInfo.evictionTimestamp = long(child_node.text) elif child_node.tag == "serviceUpTimestamp": leaseInfo.serviceUpTimestamp = long(child_node.text) return leaseInfo def _build_port(xml_node): port = PortWrapper() port.port = int(xml_node.text) port.enabled = (xml_node.attrib["enabled"] == "true") return port def get_delta(eureka_server, regions=[]): return _get_applications_(_format_url(eureka_server) + "apps/delta", regions) def get_vip(eureka_server, vip, regions=[]): return _get_applications_(_format_url(eureka_server) + "vips/" + vip, regions) def get_secure_vip(eureka_server, svip, regions=[]): return _get_applications_(_format_url(eureka_server) + "svips/" + svip, regions) def get_application(eureka_server, app_name): url = f"""{_format_url(eureka_server)}apps/{app_name}""" res = pool.request( method='GET', url=url, timeout=_DEFAULT_TIME_OUT ) txt = str(res.data, encoding=_DEFAULT_ENCODING) # url = _format_url(eureka_server) + "apps/" + app_name # f = _urllib2.urlopen(url, timeout=_DEFAULT_TIME_OUT) # txt = f.read().decode(_DEFAULT_ENCODING) # f.close() return _build_application(ElementTree.fromstring(txt)) def get_app_instance(eureka_server, app_name, instance_id): return _get_instance_(_format_url(eureka_server) + "apps/%s/%s" % (app_name, instance_id)) def get_instance(eureka_server, instance_id): return _get_instance_(_format_url(eureka_server) + "instances/" + instance_id) def _get_instance_(url): res = pool.request( method='GET', url=url, timeout=_DEFAULT_TIME_OUT ) txt = str(res.data, encoding=_DEFAULT_ENCODING) # f = _urllib2.urlopen(url, timeout=_DEFAULT_TIME_OUT) # txt = f.read().decode(_DEFAULT_ENCODING) # f.close() return _build_instance(ElementTree.fromstring(txt)) def _current_time_millis(): return int(time.time() * 1000) """====================== Registry Client =======================================""" class RegistryClient: """Eureka client for spring cloud""" def __init__(self, eureka_server=_DEFAULT_EUREKA_SERVER_URL, app_name="", instance_id="", instance_host="", instance_ip="", instance_port=_DEFAULT_INSTNACE_PORT, instance_unsecure_port_enabled=True, instance_secure_port=_DEFAULT_INSTNACE_SECURE_PORT, instance_secure_port_enabled=False, countryId=1, # @deprecaded data_center_name=_DEFAULT_DATA_CENTER_INFO, # Netflix, Amazon, MyOwn renewal_interval_in_secs=_RENEWAL_INTERVAL_IN_SECS, duration_in_secs=_DURATION_IN_SECS, home_page_url="", status_page_url="", health_check_url="", vip_adr="", secure_vip_addr="", is_coordinating_discovery_server=False): assert eureka_server is not None and eureka_server != "", "eureka server must be specified." assert app_name is not None and app_name != "", "application name must be specified." assert instance_port > 0, "port is unvalid" self.__net_lock = Lock() self.__eureka_servers = eureka_server.split(",") def try_to_get_client_ip(url): url_addr = _urllib2.get_url_and_basic_auth(url)[0] if instance_host == "" and instance_ip == "": self.__instance_host = self.__instance_ip = RegistryClient.__get_instance_ip(url_addr) elif instance_host != "" and instance_ip == "": self.__instance_host = instance_host if RegistryClient.__is_ip(instance_host): self.__instance_ip = instance_host else: self.__instance_ip = RegistryClient.__get_instance_ip(url_addr) else: self.__instance_host = instance_ip self.__instance_ip = instance_ip self.__try_all_eureka_server(try_to_get_client_ip) self.__instance = { 'instanceId': instance_id if instance_id != "" else f'{(self.__instance_host, app_name.lower(), instance_port)}', 'hostName': self.__instance_host, 'app': app_name.upper(), 'ipAddr': self.__instance_ip, 'port': { '$': instance_port, '@enabled': str(instance_unsecure_port_enabled).lower() }, 'securePort': { '$': instance_secure_port, '@enabled': str(instance_secure_port_enabled).lower() }, 'countryId': countryId, 'dataCenterInfo': { '@class': _DEFAULT_DATA_CENTER_INFO_CLASS, 'name': data_center_name }, 'leaseInfo': { 'renewalIntervalInSecs': renewal_interval_in_secs, 'durationInSecs': duration_in_secs, 'registrationTimestamp': 0, 'lastRenewalTimestamp': 0, 'evictionTimestamp': 0, 'serviceUpTimestamp': 0 }, 'metadata': { 'management.port': str(instance_port) }, 'homePageUrl': RegistryClient.__format_url(home_page_url, self.__instance_host, instance_port), 'statusPageUrl': RegistryClient.__format_url(status_page_url, self.__instance_host, instance_port, "info"), 'healthCheckUrl': RegistryClient.__format_url(health_check_url, self.__instance_host, instance_port, "health"), 'vipAddress': vip_adr if vip_adr != "" else app_name.lower(), 'secureVipAddress': secure_vip_addr if secure_vip_addr != "" else app_name.lower(), 'isCoordinatingDiscoveryServer': str(is_coordinating_discovery_server).lower() } self.__alive = False self.__heart_beat_timer = Timer(renewal_interval_in_secs, self.__heart_beat) self.__heart_beat_timer.daemon = True def __try_all_eureka_server(self, fun): with self.__net_lock: untry_servers = self.__eureka_servers tried_servers = [] ok = False while len(untry_servers) > 0: url = untry_servers[0].strip() try: fun(url) except ( _urllib2.HTTPError, _urllib2.URLError, urllib3.exceptions.HTTPError, urllib3.exceptions.RequestError, urllib3.exceptions.ResponseError) as e: logger.warning(f"Eureka server [{url}] is down, use next url to try.({e})") tried_servers.append(url) untry_servers = untry_servers[1:] raise e else: ok = True break if len(tried_servers) > 0: untry_servers.extend(tried_servers) self.__eureka_servers = untry_servers if not ok: raise _urllib2.URLError("All eureka servers are down!") @staticmethod def __format_url(url, host, port, defalut_ctx=""): if url != "": if url.startswith('http'): _url = url elif url.startswith('/'): _url = 'http://%s:%d%s' % (host, port, url) else: _url = 'http://%s:%d/%s' % (host, port, url) else: _url = 'http://%s:%d/%s' % (host, port, defalut_ctx) return _url @staticmethod def __is_ip(ip_str): return re.match(r'^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$', ip_str) @staticmethod def __get_instance_ip(eureka_server): _target_ = eureka_server if not _target_.endswith('/'): _target_ += '/' url_obj = urlparse(_target_) _target_ = url_obj.netloc logger.debug("target eureka host::: %s" % _target_) if _target_.find(':') > 0: arr = _target_.split(':') target_ip = arr[0] target_port = int(arr[1]) else: target_ip = _target_ if url_obj.scheme == "http": target_port = 80 elif url_obj.scheme == "https": target_port = 443 else: raise Exception("Cannot parse your eureka url! ") s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.connect((target_ip, target_port)) ip = s.getsockname()[0] s.close() return ip def register(self, status=INSTANCE_STATUS_UP, overriddenstatus=INSTANCE_STATUS_UNKNOWN): self.__instance["status"] = status self.__instance["overriddenstatus"] = overriddenstatus self.__instance["lastUpdatedTimestamp"] = str(_current_time_millis()) self.__instance["lastDirtyTimestamp"] = str(_current_time_millis()) try: self.__try_all_eureka_server(lambda url: _register(url, self.__instance)) except Exception as e: logger.error(f"error!{e}") else: self.__alive = True def cancel(self): try: self.__try_all_eureka_server(lambda url: cancel(url, self.__instance["app"], self.__instance["instanceId"])) except: logger.error("error!") else: self.__alive = False def send_heart_beat(self, overridden_status=""): try: self.__try_all_eureka_server(lambda url: send_heart_beat(url, self.__instance["app"], self.__instance["instanceId"], self.__instance["lastDirtyTimestamp"], status=self.__instance["status"], overriddenstatus=overridden_status)) except Exception as e: logger.error(f"error!{e.__dict__}") # lost heartbeat info try: self.register() except: # try reg self failed,close this server self.stop() def status_update(self, new_status): self.__instance["status"] = new_status try: self.__try_all_eureka_server( lambda url: status_update(url, self.__instance["app"], self.__instance["instanceId"], self.__instance["lastDirtyTimestamp"], new_status)) except: logger.error("error!") def delete_status_override(self): self.__try_all_eureka_server(lambda url: delete_status_override( url, self.__instance["app"], self.__instance["instanceId"], self.__instance["lastDirtyTimestamp"])) def start(self): logger.debug("start to registry client...") self.register() self.__heart_beat_timer.daemon = True self.__heart_beat_timer.start() def stop(self): if self.__alive: logger.debug("stopping client...") if self.__heart_beat_timer.isAlive(): self.__heart_beat_timer.cancel() self.register(status=INSTANCE_STATUS_DOWN) self.cancel() def __heart_beat(self): logger.debug("sending heart beat to spring cloud server ") self.send_heart_beat() self.__heart_beat_timer = Timer(self.__instance["leaseInfo"]["renewalIntervalInSecs"], self.__heart_beat) self.__heart_beat_timer.daemon = True self.__heart_beat_timer.start() __cache_key = "default" __cache_registry_clients = {} __cache_registry_clients_lock = Lock() def init_registry_client(eureka_server=_DEFAULT_EUREKA_SERVER_URL, app_name="", instance_id="", instance_host="", instance_ip="", instance_port=_DEFAULT_INSTNACE_PORT, instance_unsecure_port_enabled=True, instance_secure_port=_DEFAULT_INSTNACE_SECURE_PORT, instance_secure_port_enabled=False, countryId=1, # @deprecaded data_center_name=_DEFAULT_DATA_CENTER_INFO, # Netflix, Amazon, MyOwn renewal_interval_in_secs=_RENEWAL_INTERVAL_IN_SECS, duration_in_secs=_DURATION_IN_SECS, home_page_url="", status_page_url="", health_check_url="", vip_adr="", secure_vip_addr="", is_coordinating_discovery_server=False): with __cache_registry_clients_lock: client = RegistryClient(eureka_server=eureka_server, app_name=app_name, instance_id=instance_id, instance_host=instance_host, instance_ip=instance_ip, instance_port=instance_port, instance_unsecure_port_enabled=instance_unsecure_port_enabled, instance_secure_port=instance_secure_port, instance_secure_port_enabled=instance_secure_port_enabled, countryId=countryId, data_center_name=data_center_name, renewal_interval_in_secs=renewal_interval_in_secs, duration_in_secs=duration_in_secs, home_page_url=home_page_url, status_page_url=status_page_url, health_check_url=health_check_url, vip_adr=vip_adr, secure_vip_addr=secure_vip_addr, is_coordinating_discovery_server=is_coordinating_discovery_server) __cache_registry_clients[__cache_key] = client client.start() return client def get_registry_client(): # type (str) -> RegistryClient with __cache_registry_clients_lock: if __cache_key in __cache_registry_clients: return __cache_registry_clients[__cache_key] else: return None """======================== Cached Discovery Client ============================""" class DiscoveryClient: """Discover the apps registered in spring cloud server, this class will do some cached, if you want to get the apps immediatly, use the global functions""" def __init__(self, eureka_server, regions=None, renewal_interval_in_secs=_RENEWAL_INTERVAL_IN_SECS, ha_strategy=HA_STRATEGY_RANDOM): assert ha_strategy in [HA_STRATEGY_RANDOM, HA_STRATEGY_STICK, HA_STRATEGY_OTHER], "do not support strategy %d " % ha_strategy self.__eureka_servers = eureka_server.split(",") self.__regions = regions if regions is not None else [] self.__cache_time_in_secs = renewal_interval_in_secs self.__applications = None self.__delta = None self.__ha_strategy = ha_strategy self.__ha_cache = {} self.__timer = Timer(self.__cache_time_in_secs, self.__heartbeat) self.__timer.daemon = True self.__application_mth_lock = Lock() self.__net_lock = Lock() def __heartbeat(self): self.__fetch_delta() self.__timer = Timer(self.__cache_time_in_secs, self.__heartbeat) self.__timer.daemon = True self.__timer.start() @property def applications(self): with self.__application_mth_lock: if self.__applications is None: self.__pull_full_registry() return self.__applications def __try_all_eureka_server(self, fun): with self.__net_lock: untry_servers = self.__eureka_servers tried_servers = [] ok = False while len(untry_servers) > 0: url = untry_servers[0].strip() try: fun(url) except ( _urllib2.HTTPError, _urllib2.URLError, urllib3.exceptions.HTTPError, urllib3.exceptions.RequestError, urllib3.exceptions.ResponseError): logger.warning("Eureka server [%s] is down, use next url to try." % url) tried_servers.append(url) untry_servers = untry_servers[1:] else: ok = True break if len(tried_servers) > 0: untry_servers.extend(tried_servers) self.__eureka_servers = untry_servers if not ok: raise _urllib2.URLError("All eureka servers are down!") def __pull_full_registry(self): def do_pull(url): # the actual function body self.__applications = get_applications(url, self.__regions) self.__delta = self.__applications self.__try_all_eureka_server(do_pull) def __fetch_delta(self): def do_fetch(url): if self.__applications is None or len(self.__applications.applications) == 0: self.__pull_full_registry() return delta = get_delta(url, self.__regions) logger.debug("delta got: v.%s::%s" % (delta.versionsDelta, delta.appsHashcode)) if self.__delta is not None \ and delta.versionsDelta == self.__delta.versionsDelta \ and delta.appsHashcode == self.__delta.appsHashcode: return self.__merge_delta(delta) self.__delta = delta if not self.__is_hash_match(): self.__pull_full_registry() self.__try_all_eureka_server(do_fetch) def __is_hash_match(self): app_hash = self.__get_applications_hash() logger.debug("check hash, local[%s], remote[%s]" % (app_hash, self.__delta.appsHashcode)) return app_hash == self.__delta.appsHashcode def __merge_delta(self, delta): logger.debug("merge delta...length of application got from delta::%d" % len(delta.applications)) for application in delta.applications: for instance in application.instances: logger.debug("instance [%s] has %s" % (instance.instanceId, instance.actionType)) if instance.actionType in (ACTION_TYPE_ADDED, ACTION_TYPE_MODIFIED): existingApp = self.applications.get_application(application.name) if existingApp is None: self.applications.add_application(application) else: existingApp.update_instance(instance) elif instance.actionType == ACTION_TYPE_DELETED: existingApp = self.applications.get_application(application.name) if existingApp is None: self.applications.add_application(application) existingApp.remove_instance(instance) def __get_applications_hash(self): app_hash = "" app_status_count = {} for application in self.__applications.applications: for instance in application.instances: if instance.status not in app_status_count: app_status_count[instance.status.upper()] = 0 app_status_count[instance.status.upper()] = app_status_count[instance.status.upper()] + 1 sorted_app_status_count = sorted(app_status_count.items(), key=lambda item: item[0]) for item in sorted_app_status_count: app_hash = app_hash + "%s_%d_" % (item[0], item[1]) return app_hash def walk_nodes_async(self, app_name="", service="", prefer_ip=False, prefer_https=False, walker=None, on_success=None, on_error=None): def async_thread_target(): try: res = self.walk_nodes(app_name=app_name, service=service, prefer_ip=prefer_ip, prefer_https=prefer_https, walker=walker) if on_success is not None and (inspect.isfunction(on_success) or inspect.ismethod(on_success)): on_success(res) except _urllib2.HTTPError as e: if on_error is not None and (inspect.isfunction(on_error) or inspect.ismethod(on_error)): on_error(e) async_thread = Thread(target=async_thread_target) async_thread.daemon = True async_thread.start() def walk_nodes(self, app_name="", service="", prefer_ip=False, prefer_https=False, walker=None): assert app_name is not None and app_name != "", "application_name should not be null" assert inspect.isfunction(walker) or inspect.ismethod(walker), "walker must be a method or function" error_nodes = [] app_name = app_name.upper() node = self.__get_availabe_service(app_name) while node is not None: try: url = self.__generate_service_url(node, prefer_ip, prefer_https) if service.startswith("/"): url = url + service[1:] else: url = url + service logger.debug("service url::" + url) return walker(url) except ( _urllib2.HTTPError, _urllib2.URLError, urllib3.exceptions.HTTPError, urllib3.exceptions.RequestError, urllib3.exceptions.ResponseError): logger.warning("do service %s in node [%s] error, use next node." % (service, node.instanceId)) error_nodes.append(node.instanceId) node = self.__get_availabe_service(app_name, error_nodes) raise _urllib2.HTTPError(url='service/error', code=500, msg="Try all up instances in registry, but all fail", hdrs=None, fp=None) def do_service_async(self, app_name="", service="", return_type="string", prefer_ip=False, prefer_https=False, on_success=None, on_error=None, method="GET", headers=None, data=None, timeout=_DEFAULT_TIME_OUT, cafile=None, capath=None, cadefault=False, context=None): def async_thread_target(): try: res = self.do_service(app_name=app_name, service=service, return_type=return_type, prefer_ip=prefer_ip, prefer_https=prefer_https, method=method, headers=headers, data=data, timeout=timeout, cafile=cafile, capath=capath, cadefault=cadefault, context=context) if on_success is not None and (inspect.isfunction(on_success) or inspect.ismethod(on_success)): on_success(res) except _urllib2.HTTPError as e: if on_error is not None and (inspect.isfunction(on_error) or inspect.ismethod(on_error)): on_error(e) async_thread = Thread(target=async_thread_target) async_thread.daemon = True async_thread.start() def do_service(self, app_name="", service="", return_type="string", prefer_ip=False, prefer_https=False, method="GET", headers=None, data=None, timeout=_DEFAULT_TIME_OUT, cafile=None, capath=None, cadefault=False, context=None): def walk_using_urllib(url): req = _urllib2.Request(url) req.get_method = lambda: method heads = headers if headers is not None else {} for k, v in heads.items(): req.add_header(k, v) response = _urllib2.urlopen(req, data=data, timeout=timeout, cafile=cafile, capath=capath, cadefault=cadefault, context=context) res_txt = response.read().decode(_DEFAULT_ENCODING) response.close() if return_type.lower() in ("json", "dict", "dictionary"): return json.loads(res_txt) else: return res_txt return self.walk_nodes(app_name, service, prefer_ip, prefer_https, walk_using_urllib) def __get_availabe_service(self, application_name, ignore_instance_ids=None): app = self.applications.get_application(application_name) if app is None: return None up_instances = [] if ignore_instance_ids is None or len(ignore_instance_ids) == 0: up_instances.extend(app.up_instances) else: for ins in app.up_instances: if ins.instanceId not in ignore_instance_ids: up_instances.append(ins) if len(up_instances) == 0: # no up instances return None elif len(up_instances) == 1: # only one available instance, then doesn't matter which strategy is. instance = up_instances[0] self.__ha_cache[application_name] = instance.instanceId return instance def random_one(instances): if len(instances) == 1: idx = 0 else: idx = random.randint(0, len(instances) - 1) selected_instance = instances[idx] self.__ha_cache[application_name] = selected_instance.instanceId return selected_instance if self.__ha_strategy == HA_STRATEGY_RANDOM: return random_one(up_instances) elif self.__ha_strategy == HA_STRATEGY_STICK: if application_name in self.__ha_cache: cache_id = self.__ha_cache[application_name] cahce_instance = app.get_instance(cache_id) if cahce_instance is not None and cahce_instance.status == INSTANCE_STATUS_UP: return cahce_instance else: return random_one(up_instances) else: return random_one(up_instances) elif self.__ha_strategy == HA_STRATEGY_OTHER: if application_name in self.__ha_cache: cache_id = self.__ha_cache[application_name] other_instances = [] for up_instance in up_instances: if up_instance.instanceId != cache_id: other_instances.append(up_instance) return random_one(other_instances) else: return random_one(up_instances) else: return None def __generate_service_url(self, instance, prefer_ip, prefer_https): if instance is None: return None schema = "http" port = 0 if instance.port.port and not instance.securePort.enabled: schema = "http" port = instance.port.port elif not instance.port.port and instance.securePort.enabled: schema = "https" port = instance.securePort.port elif instance.port.port and instance.securePort.enabled: if prefer_https: schema = "https" port = instance.securePort.port else: schema = "http" port = instance.port.port else: assert False, "generate_service_url error: No port is available" host = instance.ipAddr if prefer_ip else instance.hostName return "%s://%s:%d/" % (schema, host, port) def start(self): self.__pull_full_registry() self.__timer.start() def stop(self): if self.__timer.isAlive(): self.__timer.cancel() __cache_discovery_clients = {} __cache_discovery_clients_lock = Lock() def init_discovery_client(eureka_server=_DEFAULT_EUREKA_SERVER_URL, regions=[], renewal_interval_in_secs=_RENEWAL_INTERVAL_IN_SECS, ha_strategy=HA_STRATEGY_RANDOM): with __cache_discovery_clients_lock: assert __cache_key not in __cache_discovery_clients, "Client has already been initialized." cli = DiscoveryClient(eureka_server, regions=regions, renewal_interval_in_secs=renewal_interval_in_secs, ha_strategy=ha_strategy) cli.start() __cache_discovery_clients[__cache_key] = cli return cli def get_discovery_client(): # type: (str) -> DiscoveryClient with __cache_discovery_clients_lock: if __cache_key in __cache_discovery_clients: return __cache_discovery_clients[__cache_key] else: return None def init(eureka_server=_DEFAULT_EUREKA_SERVER_URL, regions=[], app_name="", instance_id="", instance_host="", instance_ip="", instance_port=_DEFAULT_INSTNACE_PORT, instance_unsecure_port_enabled=True, instance_secure_port=_DEFAULT_INSTNACE_SECURE_PORT, instance_secure_port_enabled=False, countryId=1, # @deprecaded data_center_name=_DEFAULT_DATA_CENTER_INFO, # Netflix, Amazon, MyOwn renewal_interval_in_secs=_RENEWAL_INTERVAL_IN_SECS, duration_in_secs=_DURATION_IN_SECS, home_page_url="", status_page_url="", health_check_url="", vip_adr="", secure_vip_addr="", is_coordinating_discovery_server=False, ha_strategy=HA_STRATEGY_RANDOM): registry_client = init_registry_client(eureka_server=eureka_server, app_name=app_name, instance_id=instance_id, instance_host=instance_host, instance_ip=instance_ip, instance_port=instance_port, instance_unsecure_port_enabled=instance_unsecure_port_enabled, instance_secure_port=instance_secure_port, instance_secure_port_enabled=instance_secure_port_enabled, countryId=countryId, data_center_name=data_center_name, renewal_interval_in_secs=renewal_interval_in_secs, duration_in_secs=duration_in_secs, home_page_url=home_page_url, status_page_url=status_page_url, health_check_url=health_check_url, vip_adr=vip_adr, secure_vip_addr=secure_vip_addr, is_coordinating_discovery_server=is_coordinating_discovery_server) discovery_client = init_discovery_client(eureka_server, regions=regions, renewal_interval_in_secs=renewal_interval_in_secs, ha_strategy=ha_strategy) return registry_client, discovery_client def walk_nodes_async(app_name="", service="", prefer_ip=False, prefer_https=False, walker=None, on_success=None, on_error=None): cli = get_discovery_client() if cli is None: raise Exception("Discovery Client has not initialized. ") cli.walk_nodes_async(app_name=app_name, service=service, prefer_ip=prefer_ip, prefer_https=prefer_https, walker=walker, on_success=on_success, on_error=on_error) def walk_nodes(app_name="", service="", prefer_ip=False, prefer_https=False, walker=None): cli = get_discovery_client() if cli is None: raise Exception("Discovery Client has not initialized. ") return cli.walk_nodes(app_name=app_name, service=service, prefer_ip=prefer_ip, prefer_https=prefer_https, walker=walker) def do_service_async(app_name="", service="", return_type="string", prefer_ip=False, prefer_https=False, on_success=None, on_error=None, method="GET", headers=None, data=None, timeout=_DEFAULT_TIME_OUT, cafile=None, capath=None, cadefault=False, context=None): cli = get_discovery_client() if cli is None: raise Exception("Discovery Client has not initialized. ") cli.do_service_async(app_name=app_name, service=service, return_type=return_type, prefer_ip=prefer_ip, prefer_https=prefer_https, on_success=on_success, on_error=on_error, method=method, headers=headers, data=data, timeout=timeout, cafile=cafile, capath=capath, cadefault=cadefault, context=context) def do_service(app_name="", service="", return_type="string", prefer_ip=False, prefer_https=False, method="GET", headers=None, data=None, timeout=_DEFAULT_TIME_OUT, cafile=None, capath=None, cadefault=False, context=None): cli = get_discovery_client() if cli is None: raise Exception("Discovery Client has not initialized. ") return cli.do_service(app_name=app_name, service=service, return_type=return_type, prefer_ip=prefer_ip, prefer_https=prefer_https, method=method, headers=headers, data=data, timeout=timeout, cafile=cafile, capath=capath, cadefault=cadefault, context=context) def stop(): register_cli = get_registry_client() if register_cli is not None: register_cli.stop() discovery_client = get_discovery_client() if discovery_client is not None: discovery_client.stop() @atexit.register def _cleanup_before_exist(): if len(__cache_registry_clients) > 0: logger.debug("cleaning up registry clients") for k, cli in __cache_registry_clients.items(): logger.debug( "try to stop cache registry client [%s] this will also unregister this client from the eureka server" % k) cli.stop() if len(__cache_discovery_clients) > 0: logger.debug("cleaning up discovery clients") for k, cli in __cache_discovery_clients.items(): logger.debug( "try to stop cache discovery client [%s] this will also unregister this client from the eureka server" % k) cli.stop() PK!Nsdophon_cloud/utils/__init__.py# 可以封装成函数,方便 Python 的程序调用 import socket def get_host_ip(): try: s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.connect(('8.8.8.8', 80)) ip = s.getsockname()[0] finally: s.close() return ip PK!f+(($dophon_cloud-1.0.3.dist-info/LICENSEApache License Version 2.0, January 2004 http://www.apache.org/licenses/ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION 1. Definitions. "License" shall mean the terms and conditions for use, reproduction, and distribution as defined by Sections 1 through 9 of this document. "Licensor" shall mean the copyright owner or entity authorized by the copyright owner that is granting the License. "Legal Entity" shall mean the union of the acting entity and all other entities that control, are controlled by, or are under common control with that entity. For the purposes of this definition, "control" means (i) the power, direct or indirect, to cause the direction or management of such entity, whether by contract or otherwise, or (ii) ownership of fifty percent (50%) or more of the outstanding shares, or (iii) beneficial ownership of such entity. "You" (or "Your") shall mean an individual or Legal Entity exercising permissions granted by this License. "Source" form shall mean the preferred form for making modifications, including but not limited to software source code, documentation source, and configuration files. "Object" form shall mean any form resulting from mechanical transformation or translation of a Source form, including but not limited to compiled object code, generated documentation, and conversions to other media types. "Work" shall mean the work of authorship, whether in Source or Object form, made available under the License, as indicated by a copyright notice that is included in or attached to the work (an example is provided in the Appendix below). "Derivative Works" shall mean any work, whether in Source or Object form, that is based on (or derived from) the Work and for which the editorial revisions, annotations, elaborations, or other modifications represent, as a whole, an original work of authorship. For the purposes of this License, Derivative Works shall not include works that remain separable from, or merely link (or bind by name) to the interfaces of, the Work and Derivative Works thereof. "Contribution" shall mean any work of authorship, including the original version of the Work and any modifications or additions to that Work or Derivative Works thereof, that is intentionally submitted to Licensor for inclusion in the Work by the copyright owner or by an individual or Legal Entity authorized to submit on behalf of the copyright owner. For the purposes of this definition, "submitted" means any form of electronic, verbal, or written communication sent to the Licensor or its representatives, including but not limited to communication on electronic mailing lists, source code control systems, and issue tracking systems that are managed by, or on behalf of, the Licensor for the purpose of discussing and improving the Work, but excluding communication that is conspicuously marked or otherwise designated in writing by the copyright owner as "Not a Contribution." "Contributor" shall mean Licensor and any individual or Legal Entity on behalf of whom a Contribution has been received by Licensor and subsequently incorporated within the Work. 2. Grant of Copyright License. Subject to the terms and conditions of this License, each Contributor hereby grants to You a perpetual, worldwide, non-exclusive, no-charge, royalty-free, irrevocable copyright license to reproduce, prepare Derivative Works of, publicly display, publicly perform, sublicense, and distribute the Work and such Derivative Works in Source or Object form. 3. Grant of Patent License. Subject to the terms and conditions of this License, each Contributor hereby grants to You a perpetual, worldwide, non-exclusive, no-charge, royalty-free, irrevocable (except as stated in this section) patent license to make, have made, use, offer to sell, sell, import, and otherwise transfer the Work, where such license applies only to those patent claims licensable by such Contributor that are necessarily infringed by their Contribution(s) alone or by combination of their Contribution(s) with the Work to which such Contribution(s) was submitted. If You institute patent litigation against any entity (including a cross-claim or counterclaim in a lawsuit) alleging that the Work or a Contribution incorporated within the Work constitutes direct or contributory patent infringement, then any patent licenses granted to You under this License for that Work shall terminate as of the date such litigation is filed. 4. Redistribution. You may reproduce and distribute copies of the Work or Derivative Works thereof in any medium, with or without modifications, and in Source or Object form, provided that You meet the following conditions: You must give any other recipients of the Work or Derivative Works a copy of this License; and You must cause any modified files to carry prominent notices stating that You changed the files; and You must retain, in the Source form of any Derivative Works that You distribute, all copyright, patent, trademark, and attribution notices from the Source form of the Work, excluding those notices that do not pertain to any part of the Derivative Works; and If the Work includes a "NOTICE" text file as part of its distribution, then any Derivative Works that You distribute must include a readable copy of the attribution notices contained within such NOTICE file, excluding those notices that do not pertain to any part of the Derivative Works, in at least one of the following places: within a NOTICE text file distributed as part of the Derivative Works; within the Source form or documentation, if provided along with the Derivative Works; or, within a display generated by the Derivative Works, if and wherever such third-party notices normally appear. The contents of the NOTICE file are for informational purposes only and do not modify the License. You may add Your own attribution notices within Derivative Works that You distribute, alongside or as an addendum to the NOTICE text from the Work, provided that such additional attribution notices cannot be construed as modifying the License. You may add Your own copyright statement to Your modifications and may provide additional or different license terms and conditions for use, reproduction, or distribution of Your modifications, or for any such Derivative Works as a whole, provided Your use, reproduction, and distribution of the Work otherwise complies with the conditions stated in this License. 5. Submission of Contributions. Unless You explicitly state otherwise, any Contribution intentionally submitted for inclusion in the Work by You to the Licensor shall be under the terms and conditions of this License, without any additional terms or conditions. Notwithstanding the above, nothing herein shall supersede or modify the terms of any separate license agreement you may have executed with Licensor regarding such Contributions. 6. Trademarks. This License does not grant permission to use the trade names, trademarks, service marks, or product names of the Licensor, except as required for reasonable and customary use in describing the origin of the Work and reproducing the content of the NOTICE file. 7. Disclaimer of Warranty. Unless required by applicable law or agreed to in writing, Licensor provides the Work (and each Contributor provides its Contributions) on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied, including, without limitation, any warranties or conditions of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A PARTICULAR PURPOSE. You are solely responsible for determining the appropriateness of using or redistributing the Work and assume any risks associated with Your exercise of permissions under this License. 8. Limitation of Liability. In no event and under no legal theory, whether in tort (including negligence), contract, or otherwise, unless required by applicable law (such as deliberate and grossly negligent acts) or agreed to in writing, shall any Contributor be liable to You for damages, including any direct, indirect, special, incidental, or consequential damages of any character arising as a result of this License or out of the use or inability to use the Work (including but not limited to damages for loss of goodwill, work stoppage, computer failure or malfunction, or any and all other commercial damages or losses), even if such Contributor has been advised of the possibility of such damages. 9. Accepting Warranty or Additional Liability. While redistributing the Work or Derivative Works thereof, You may choose to offer, and charge a fee for, acceptance of support, warranty, indemnity, or other liability obligations and/or rights consistent with this License. However, in accepting such obligations, You may act only on Your own behalf and on Your sole responsibility, not on behalf of any other Contributor, and only if You agree to indemnify, defend, and hold each Contributor harmless for any liability incurred by, or claims asserted against, such Contributor by reason of your accepting any such warranty or additional liability. END OF TERMS AND CONDITIONS APPENDIX: How to apply the Apache License to your work To apply the Apache License to your work, attach the following boilerplate notice, with the fields enclosed by brackets "{}" replaced with your own identifying information. (Don't include the brackets!) The text should be enclosed in the appropriate comment syntax for the file format. We also recommend that a file or class name and description of purpose be included on the same "printed page" as the copyright notice for easier identification within third-party archives. Copyright 2018 CallMeE Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.PK!H|n-WY"dophon_cloud-1.0.3.dist-info/WHEEL A н#Z;/" bFF]xzwK;<*mTֻ0*Ri.4Vm0[H, JPK!Hae%dophon_cloud-1.0.3.dist-info/METADATAN0E /iJHh ?=P;x,=F xAن/eZ0M. g^sYd #tFfr>rh#u*޵(ï:o@=n n8vcVa !+aS!Jyor>OӲB.EqYl5{DOvǾ"aLPK!Hwa#dophon_cloud-1.0.3.dist-info/RECORDIs@{~ f9EQpP(-_?I&K}1RJSI5 3a8I#^pPLU*6|y5-LN6=3KaFêH̏w?S%0Oj|NUK߭lpnʞ$NA?QҴ8 LDl/[V2u9-.M7o6Ƨ:jqwN6^xﶾCvar(%'(Vq5 Ij\D a[CL[<7U5,{L!#wuI+Lªᩓ8âzW~i;(8ruy5,8ꡱDCol7`vY2UQs2;  Rum1i Pnv5}L #I '<Ǻ6+>$"z{v.wV#:<>.a%e !#'F`$iٌ5בA6e܊9Ooq19ϯE6E Qַ3{&Z.>>;xZ_g) Q3 D:Q=޵ju}bJ}\)aVݛA-(1H_y PK!/uydophon_cloud/__init__.pyPK!s)3_;_;$Kdophon_cloud/enhance_pkg/__init__.pyPK!!>dophon_cloud/enhance_pkg/const.pyPK!L$FF++?dophon_cloud/enhance_pkg/decrate_enhance.pyPK!\; &dophon_cloud/eureka_client/__init__.pyPK!m(/Jdophon_cloud/eureka_client/__urlopen_proxy__.pyPK!+Bdophon_cloud/eureka_client/eureka_client.pyPK!^n