PK!] synapse_p2p/__init__.py__version__ = "0.1.3" __logo__ = f""" \033[32m ███████╗██╗ ██╗███╗ ██╗ █████╗ ██████╗ ███████╗███████╗ ██╔════╝╚██╗ ██╔╝████╗ ██║██╔══██╗██╔══██╗██╔════╝██╔════╝ ███████╗ ╚████╔╝ ██╔██╗ ██║███████║██████╔╝███████╗█████╗ ╚════██║ ╚██╔╝ ██║╚██╗██║██╔══██║██╔═══╝ ╚════██║██╔══╝ ███████║ ██║ ██║ ╚████║██║ ██║██║ ███████║███████╗ ╚══════╝ ╚═╝ ╚═╝ ╚═══╝╚═╝ ╚═╝╚═╝ ╚══════╝╚══════╝ \033[0m \033[33m⚡ \033[35msynapse \033[36m{__version__}\033[0m """ from synapse_p2p.messages import RemoteProcedureCall from synapse_p2p.server import Server __all__ = ["Server", "RemoteProcedureCall", __logo__] PK!Misynapse_p2p/background.pyimport asyncio import multiprocessing from synapse_p2p.types import BackgroundTask class BackgroundTaskHandler: def __init__(self, n=None): self.cpu_count = n or multiprocessing.cpu_count() self.tasks = [] def schedule_task(self, background_task: BackgroundTask): asyncio.create_task(background_task.callable()) # Schedule recursive background task asyncio.get_event_loop().call_later( background_task.period, self.schedule_task, background_task, ) def add_task(self, task: BackgroundTask): self.tasks.append(task) def __call__(self, *args, **kwargs): for task in self.tasks: self.schedule_task(task) PK!k+//synapse_p2p/exceptions.pyclass InvalidMessageError(Exception): pass PK!X(_rrsynapse_p2p/messages.pyfrom dataclasses import dataclass @dataclass class RemoteProcedureCall: endpoint: str args: list = None PK!ݶKsynapse_p2p/serializers.pyfrom dataclasses import asdict import msgpack from loguru import logger from synapse_p2p import RemoteProcedureCall from synapse_p2p.exceptions import InvalidMessageError class BaseRPCSerializer: @classmethod def serialize(cls, outgoing: RemoteProcedureCall) -> bytes: raise NotImplementedError @classmethod def deserialize(cls, incoming: bytes) -> RemoteProcedureCall: raise NotImplementedError class MessagePackRPCSerializer(BaseRPCSerializer): @classmethod def serialize(cls, outgoing: RemoteProcedureCall) -> bytes: return msgpack.packb(asdict(outgoing)) @classmethod def deserialize(cls, incoming: bytes) -> RemoteProcedureCall: try: return RemoteProcedureCall(**msgpack.unpackb(incoming, raw=False)) except TypeError as e: logger.error(f"Could not deserialize payload to dataclass", bytes) raise InvalidMessageError from e PK!Ʈ synapse_p2p/server.pyimport asyncio from loguru import logger from synapse_p2p import __logo__ from synapse_p2p.background import BackgroundTaskHandler from synapse_p2p.exceptions import InvalidMessageError from synapse_p2p.messages import RemoteProcedureCall from synapse_p2p.serializers import MessagePackRPCSerializer from synapse_p2p.types import Node, build_node_from_peer_name, BackgroundTask class Server: def __init__(self, address="127.0.0.1", port="9999", serializer_class=MessagePackRPCSerializer): self.address = address self.port = port self.namespace = None self.endpoint_directory = {} self.max_upload_size = 4096 self.background_executor = BackgroundTaskHandler() self.serializer_class = serializer_class print(__logo__) def run(self): """ Run the server """ loop = asyncio.get_event_loop() try: asyncio.ensure_future(self.serve()) loop.run_forever() except KeyboardInterrupt: loop.stop() async def handle_data(self, reader, writer): """ Coroutine handler for receiving data, parsing the message and returning a response """ node: Node = build_node_from_peer_name(writer.get_extra_info('peername')) logger.debug(f"Talking to Node {node.identifier} @ {node.ip}:{node.port}") data = await reader.read(self.max_upload_size) try: rpc: RemoteProcedureCall = self.serializer_class.deserialize(data) endpoint = self.endpoint_directory.get(rpc.endpoint) if not endpoint: raise InvalidMessageError(f"Unregistered endpoint called: {rpc.endpoint}") r = await endpoint(*rpc.args, rpc=rpc, node=node, response=writer) if r is None: await writer.drain() except InvalidMessageError: logger.debug(f"Invalid message received", extra={"ip": node.ip, "port": node.port, "raw": data}) writer.write("400".encode()) logger.debug("Closing Connection") writer.close() async def serve(self): """ Attach TCP Stream handler to underlying socket interface """ server = await asyncio.start_server(self.handle_data, self.address, self.port) print(f"Listening on {self.address}:{self.port}") print(f"\nRegistered Endpoints:") for endpoint in self.endpoint_directory: print(f"- {endpoint}") print(f"\nBackground Tasks:") for task in self.background_executor.tasks: print(f"- {task.name} ({task.period}s)") print("\n") self.background_executor() async with server: await server.serve_forever() def endpoint(self, name=None, **options): """ Decorator to mark a method as a UDP Endpoint """ def decorator(wrapped): self.endpoint_directory[name or wrapped.__name__] = wrapped return wrapped return decorator def background(self, period, **options): """ Decorator to schedule a background task periodically """ def decorator(wrapped): self.background_executor.add_task( BackgroundTask(name=wrapped.__name__, callable=wrapped, period=period) ) return wrapped return decorator PK!synapse_p2p/tests/__init__.pyPK!/  synapse_p2p/tests/conftest.pyfrom random import randint import pytest from faker import Faker from synapse_p2p.messages import Intro from synapse_p2p.server import Server from synapse_p2p.types import Node from synapse_p2p.utils import random_hash f = Faker() @pytest.fixture def identifier(): return random_hash() @pytest.fixture def ipv4(): return f.ipv4_public() @pytest.fixture def port(): return randint(2000, 10000) @pytest.fixture def node(identifier, ipv4, port): return { "identifier": identifier, "ip": ipv4, "port": port, } @pytest.fixture def intro(node, identifier): return Intro( identifier=identifier, nodes=[Node(**node) for _ in range(randint(0, 20))], ) @pytest.fixture def server(): return Server("123", "127.0.0.1", 9999) PK!3ܚ#synapse_p2p/tests/test_endpoints.pyimport pytest from synapse_p2p.types import Node class TestIntro: @pytest.mark.asyncio async def test_intro_happy_path(self, server, intro, node): await server.intro(message=intro, caller=Node(**node)) assert len(server.neighborhood) == len(intro.nodes) + 1 PK!3''"synapse_p2p/tests/test_messages.pyfrom synapse_p2p.messages import RemoteProcedureCall def test_rpc_hydrate(): payload = { "endpoint": "my_endpoint", "args": [1, 2], } new = RemoteProcedureCall.hydrate(payload) assert new.endpoint == payload["endpoint"] assert new.args == payload["args"] PK![x722 synapse_p2p/tests/test_server.pyfrom random import randint import msgpack import pytest from faker import Faker from synapse_p2p.types import Node from synapse_p2p.utils import random_hash f = Faker() @pytest.fixture def address(): return f.ipv4(), randint(2000, 60000) def node_factory(identifier=None, ip=None, port=None): return { "identifier": identifier or random_hash(), "ip": ip or f.ipv4(), "port": port or randint(1000, 60000), } @pytest.fixture def valid_intro_message(node): return { "identifier": random_hash(), "nodes": [node() for _ in range(randint(0, 100))], "m": "intro", } @pytest.fixture def valid_ping_message(node): return { "identifier": random_hash(), "m": "ping", } def test_parse_message_intro(server, valid_intro_message, address): new_intro, caller = server.parse_message(msgpack.packb(valid_intro_message), address) assert isinstance(caller, Node) assert isinstance(new_intro, Intro) assert new_intro.identifier is not None assert caller.distance > 0 PK!~bsynapse_p2p/types.pyfrom dataclasses import dataclass from hashlib import sha256 @dataclass class Node: identifier: str ip: str port: int @dataclass class BackgroundTask: name: str callable: callable period: int def get_identifier(peer_name): return sha256(f"{peer_name[0]}:{peer_name[1]}".encode()).hexdigest() def build_node_from_peer_name(peer_name): return Node( identifier=get_identifier(peer_name)[:8], ip=peer_name[0], port=peer_name[1], ) PK!#synapse_p2p/utils.pyimport os from hashlib import sha256 from typing import List from uuid import uuid4 from synapse_p2p.types import Node def guid(): return uuid4().hex def random_hash(): return sha256(os.urandom(16)).hexdigest() def sort_nodes_by_xor_distance(source: int, nodes: List[Node], reverse=False): return sorted(nodes, key=lambda n: int(n.identifier, 16) ^ source, reverse=reverse) def xor_distance(source: int, candidate: str): return source ^ int(candidate, 16) PK!HڽTU!synapse_p2p-0.1.4.dist-info/WHEEL A н#Z;/"d&F[xzw@Zpy3Fv]\fi4WZ^EgM_-]#0(q7PK!HT$synapse_p2p-0.1.4.dist-info/METADATAOJ1}Wp2ݠT,-"Xǡb}ɤR7)E7.g=Fi 5gF.'tJ-i~dNk0lpHRv0<"'s VBc:?+ʎusѐUGlHը2(+pDJ0$=U;fiq7˛ yny.hDMeb+z!2PV$:݉|PK!Hه|"synapse_p2p-0.1.4.dist-info/RECORD}ɲ8}n3^`6̆<~"'V_u~Gu}ü0"aI1YsqwMf@VpR=d.T$}C+th&وt8@@p@X+kɲ?^]`%0b_@ưAp. {;3;{Y{ m6~9ѯkP­&/SГ@1r w`hV/1! 9n cOޜ\)k<b.Nf,`*p`o pzEL 9Y/0ƪXpHo1^kd ^ݝ#u2"A˔V Z'ޡ?6I͸hyx ?^*MX^Wj+ ɿW55G1`diu lEngF k-t<(}䊇(%iS8qm/뽟XT|qXGTTuRJ'uO^]гu 9y&}Yټj B~b`grψ1[ '>h~TE10{)0fP *Up~kO%Zo,}]ҰwLW۔^G,-5aLwF_mGaPK!] synapse_p2p/__init__.pyPK!Mi(synapse_p2p/background.pyPK!k+//?synapse_p2p/exceptions.pyPK!X(_rrsynapse_p2p/messages.pyPK!ݶKL synapse_p2p/serializers.pyPK!Ʈ 5 synapse_p2p/server.pyPK!synapse_p2p/tests/__init__.pyPK!/  6synapse_p2p/tests/conftest.pyPK!3ܚ#synapse_p2p/tests/test_endpoints.pyPK!3''"synapse_p2p/tests/test_messages.pyPK![x722 V!synapse_p2p/tests/test_server.pyPK!~b%synapse_p2p/types.pyPK!#'synapse_p2p/utils.pyPK!HڽTU!)synapse_p2p-0.1.4.dist-info/WHEELPK!HT$*synapse_p2p-0.1.4.dist-info/METADATAPK!Hه|"+synapse_p2p-0.1.4.dist-info/RECORDPK.