PK!tdophon_cloud_center/__init__.pyfrom dophon_cloud_center import reg_center from dophon import logger logger.inject_logger(globals()) def active_center(prop_obj: dict, singleton: bool = True, clusters: int = 0): def inj_method(f): def inj_args(*args, **kwargs): # 前期设置 f(*args, **kwargs) if singleton: reg_center.run_singleton(properties=prop_obj) else: if clusters < 2: raise Exception('集群数异常: %s' % clusters) else: c_props = [] for next_port in range(clusters): clusters_prop = prop_obj.copy() clusters_prop['port'] = int(prop_obj['port']) + next_port c_props.append(clusters_prop) reg_center.run_clusters(properties_list=c_props) return inj_args return inj_method PK!TS  $dophon_cloud_center/kits/__init__.pyimport hashlib import schedule import urllib3 import time import json from dophon import logger logger.inject_logger(globals()) _sech = schedule.Scheduler() req_pool = urllib3.PoolManager() start_flag=False class ServiceCell(): __host = None __port = None __heart_interface = None __id = None def __init__(self, host, port, hear_url='/heart'): self.__host = host self.__port = port self.__heart_interface = hear_url self.__id = hashlib.sha1( (str(self.__host) + str(self.__port) + str(self.__heart_interface)).encode('utf8')).hexdigest() def __dict__(self): return { 'id': self.__id, 'host': self.__host, 'port': self.__port, 'heart_interface': self.__heart_interface } def __eq__(self, other): return self.__id == other['id'] class HeartSech(): def __init__(self, time): self.__reg={} _sech.every(time).seconds.do(self.heart_beat) ''' 心跳一跳操作 ''' def heart_beat(self): try: # 获取每个服务 # print(self.__reg) for service_name in self.__reg: if service_name == 'DOPHON_REG_CENTER_CLUSTERS': # 跳过注册中心集群配置 continue # 获取每个服务的实例列表 services = self.__reg[service_name] # print(services) services_copy = services for instence in services: # 获取每个具体实例 ip_addr = str(instence['host']).split(':')[0] url = 'http://' + ip_addr + ':' + instence['port'] + instence[ 'heart_interface'] + '/as/' + service_name logger.info('%s reg_info: %s' % (id(self),str(self.__reg))) logger.info('发送心跳: %s' % (url)) try: res = req_pool.request(method='POST', url=url, body=bytes(json.dumps(self.__reg), encoding='utf8'), headers={ 'Content-Type': 'application/json' }) res_result = eval(res.data.decode('utf8')) if res_result['event'] == 404: # 实例注册信息有误 services_copy.remove(instence) self.__reg[service_name] = services_copy except Exception as e: # 连接失败打印错误信息 logger.info( '%s \n %s \n %s:%s \n连接失败,移除对应实例' % (e, service_name, ip_addr, instence['port']) ) services_copy.remove(instence) self.__reg[service_name] = services_copy if 0 >= len(services): self.__reg.pop(service_name) if 0 >= len(self.__reg): break; except Exception as e: print(e) # print(self.__reg) def update_reg(self, reg_info): self.__reg = reg_info def start_heart(round_sec: int): global start_flag if start_flag: return else: start_flag=True while True: _sech.run_all() time.sleep(round_sec) def get_heart(sech_time: int = 15) -> HeartSech: """ :param sech_time: 心跳检查时间,默认15秒,最低15秒(线程等待时间) """ if sech_time<15: sech_time = 15 return HeartSech(sech_time) PK!I##!dophon_cloud_center/reg_center.pyimport socket from flask import Flask, render_template, redirect, request, jsonify, abort from dophon_cloud_center import kits import threading import urllib3 from flask_bootstrap import Bootstrap import time from dophon import logger logger.inject_logger(globals()) config = ['DOPHON_REG_CENTER_CLUSTERS'] # 公共配置 instance_cache = [] # 实例列表 clusters = {} # 集群信息 req_pool = urllib3.PoolManager() """ 注册中心 默认端口为8361 """ class Center(Flask): @property def reg_info(self): return self._reg_info @reg_info.setter def setter_reg_info(self, value: dict): self._reg_info = value @reg_info.getter def getter_reg_info(self): return self._reg_info def __init__( self, config: dict = {}, *args, **kwargs ): super(Center, self).__init__(*args, **kwargs) # 配置编码 self.config['JSON_AS_ASCII'] = False # h5添加bootstrap样式 Bootstrap(self) # 写入配置 self._center_config = config self._heart_cell = kits.get_heart() # 类内属性 self._reg_info = {} self.__self_reg_info = {} # 绑定首页路由(重定向) self.add_url_rule('/', 'hello_world', self.hello_world, methods=['get', 'post']) # 绑定注册中心信息路由 self.add_url_rule('/center', 'center', self.center, methods=['get', 'post']) # 绑定查看实例信息路由 self.add_url_rule('/request//', 'request_instance', self.request_instance) # 绑定注册服务路由 self.add_url_rule('/reg/service/', 'reg_service', self.reg_service, methods=['get']) # 绑定查询中心健康状态路由 self.add_url_rule('/health', 'health', self.health, methods=['get']) # 绑定注册实例列表更新接口路由 self.add_url_rule('/reg/update', 'get_reg_info', self.get_reg_info, methods=['get']) def run(self, host=None, port=None, debug=None, load_dotenv=True, **options): # 保存地址信息 self._addr_info = { 'host': get_host_ip(), 'port': port } super(Center, self).run(host=host, port=port, debug=debug, load_dotenv=load_dotenv, **options) def hello_world(self): return redirect('/center', ) def center(self): m = request.method view_data = self._reg_info.copy() # 消除内部功能参数 for k in config: if k in view_data: view_data.pop(k) if m == 'GET': return render_template('center.html', reg_info=view_data) if m == 'POST': return jsonify(view_data) return abort(400) def request_instance(self, service_name, instance_id): if service_name in self._reg_info.keys(): s_instances = self._reg_info[service_name] for instance in s_instances: if instance_id == instance['id']: res = req_pool.request('POST', 'http://' + str(instance['host'] + ':' + instance['port'] + '/heart')) if 200 == res.status: return jsonify(eval(res.data)) else: return abort(404) else: return abort(404) def reg_service(self, name): logger.info('ri: %s sri:%s' % (str(id(self.reg_info)),str(id(self.__self_reg_info)))) logger.info('%s' % (self._center_config['broadcast_heartbeat'])) reg_info = self._reg_info if self._center_config['broadcast_heartbeat'] else self.__self_reg_info reg_info_cache = self._reg_info # 处理多实例服务注册 if name.upper() in reg_info_cache: cache = reg_info_cache[name.upper()] else: cache = [] h = request.headers # 组装服务细胞信息 if 'prefer_ip' in h: ip = h['prefer_ip'] else: ip = str(h['Host']).split(':')[0] if 'u_heart_interface' in h: heart = h['u_heart_interface'] s_c = kits.ServiceCell(ip, h['service_port'], heart) else: s_c = kits.ServiceCell(ip, h['service_port']) for item in cache: if item == s_c: return jsonify(self._reg_info) cache.append(s_c.__dict__()) reg_info_cache[name.upper()] = cache self._reg_info = reg_info_cache if not self._center_config['broadcast_heartbeat']: # 写入自身注册实例 if name.upper() in reg_info: cache = reg_info[name.upper()] else: cache = [] cache.append(s_c.__dict__()) self.__self_reg_info[name.upper()] = cache # print(reg_info_cache) self._heart_cell.update_reg(reg_info) if clusters: # 存在集群信息 self_clusters_info = clusters.copy() self_clusters_info.pop( str(hash(self._addr_info['host'] + ':' + str(self._addr_info['port']))) ) reg_info_cache['DOPHON_REG_CENTER_CLUSTERS'] = self_clusters_info # 广播服务实例信息 for cluster_instance in instance_cache: cluster_instance.sync_reg_info( self._reg_info, str(hash( str(get_host_ip()) + ':' + str(self._addr_info['port']) )) ) return jsonify(reg_info_cache) def health(self): ''' 检查注册中心健康状态 :return: ''' return jsonify({}) def get_reg_info(self): return jsonify(self._reg_info) def get_addr_info(self): """ 返回实例连接信息 :return: """ return self._addr_info # 集群内共享实例信息 def sync_reg_info(self, _reg_info: dict, access_token: str): if access_token in clusters: # 验证发起更新实例 if hash(str(self.reg_info)) == hash(str(_reg_info)): return self._reg_info = _reg_info def active_heart_check(self,round_sec:int=None): threading.Thread( target=self._heart_cell.start_heart, kwargs={ 'round_sec': round_sec if round_sec else 15 } ).start() def get_host_ip(): try: s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.connect(('8.8.8.8', 80)) ip = s.getsockname()[0] finally: s.close() return ip def run_singleton(properties={}): app = Center( { # 广播式心跳(慎用,会消耗网络资源) -> 向集群内部所有实例广播心跳 'broadcast_heartbeat': bool(properties['broadcast_heartbeat'] if 'broadcast_heartbeat' in properties else False) }, __name__ ) if 'heart_check' in properties.keys() and properties['heart_check']: # app.active_heart_check(properties['heart_round_second']) threading.Thread( target=kits.start_heart, kwargs={ 'round_sec': properties['heart_round_second'] if 'heart_round_second' in properties else 15 } ).start() try: instance_cache.append(app) clusters[ str( hash( str(get_host_ip()) + ':' + str(properties['port']) ) ) ] = { 'host': get_host_ip(), 'port': properties['port'] } app.run( host=properties['host'] if 'host' in properties and properties['host'] else '0.0.0.0', port=properties['port'] if 'port' in properties and properties['port'] else 8361 ) except Exception as e: instance_cache.remove(app) raise e def run_clusters(properties_list: list): # 启动注册中心集群 for properties in properties_list: singleton_thread = threading.Thread(target=run_singleton, kwargs={ 'properties': properties }) singleton_thread.start() while True: if len(instance_cache) == len(properties_list): break time.sleep(0.5) print('集群启动完毕') PK!\\)dophon_cloud_center/templates/center.html {% extends "bootstrap/base.html" %} {% block content %} reg_center
{% for service in reg_info %} {% endfor %}
服务名 服务实例
{{ service }} {% for instance in reg_info[service] %} {{ instance['host'] }}:{{ instance['port'] }}; {% endfor %}
{% endblock %}PK!H\TT)dophon_cloud_center-1.0.1.dist-info/WHEEL 1 0 нR \I$ơ7.ZON `h6oi14m,b4>4ɛpK>X;baP>PK!HQIlw,dophon_cloud_center-1.0.1.dist-info/METADATAJ1&t]BRv-z=@KnwR3wrpѣ' #'rp\F9P/֨Y_g,D2pؓܣqo['ܗ