PK!P#{{pynats/__init__.pyfrom .client import NATSClient, NATSMessage, NATSSubscription __all__ = ("NATSClient", "NATSMessage", "NATSSubscription") PK!cy"y"pynats/client.pyimport io import json import re import socket from dataclasses import dataclass from typing import Callable, Dict, Match, Optional, Pattern, Tuple, Union, cast from urllib.parse import urlparse import pkg_resources from pynats.exceptions import NATSInvalidResponse, NATSUnexpectedResponse from pynats.nuid import NUID __all__ = ("NATSSubscription", "NATSMessage", "NATSClient") INFO_OP = b"INFO" CONNECT_OP = b"CONNECT" PING_OP = b"PING" PONG_OP = b"PONG" SUB_OP = b"SUB" UNSUB_OP = b"UNSUB" PUB_OP = b"PUB" MSG_OP = b"MSG" OK_OP = b"+OK" ERR_OP = b"-ERR" INFO_RE = re.compile(rb"^INFO\s+([^\r\n]+)\r\n") PING_RE = re.compile(rb"^PING\r\n") PONG_RE = re.compile(rb"^PONG\r\n") MSG_RE = re.compile( rb"^MSG\s+(?P[^\s\r\n]+)\s+(?P[^\s\r\n]+)\s+(?P([^\s\r\n]+)[^\S\r\n]+)?(?P\d+)\r\n" # noqa ) OK_RE = re.compile(rb"^\+OK\s*\r\n") ERR_RE = re.compile(rb"^-ERR\s+('.+')?\r\n") _CRLF_ = b"\r\n" _SPC_ = b" " COMMANDS = { INFO_OP: INFO_RE, PING_OP: PING_RE, PONG_OP: PONG_RE, MSG_OP: MSG_RE, OK_OP: OK_RE, ERR_OP: ERR_RE, } INBOX_PREFIX = bytearray(b"_INBOX.") @dataclass class NATSSubscription: sid: int subject: str queue: str callback: Callable max_messages: Optional[int] = None received_messages: int = 0 def is_wasted(self): return ( self.max_messages is not None and self.received_messages == self.max_messages ) @dataclass class NATSMessage: sid: int subject: str reply: str payload: bytes class NATSClient: __slots__ = ( "_conn_options", "_socket", "_socket_file", "_socket_options", "_ssid", "_subs", "_nuid", ) def __init__( self, url: str = "nats://127.0.0.1:4222", *, name: str = "nats-python", verbose: bool = False, pedantic: bool = False, ssl_required: bool = False, socket_timeout: float = None, socket_keepalive: bool = False, ) -> None: parsed = urlparse(url) self._conn_options = { "hostname": parsed.hostname, "port": parsed.port, "username": parsed.username, "password": parsed.password, "name": name, "lang": "python", "protocol": 0, "version": pkg_resources.get_distribution("nats-python").version, "verbose": verbose, "pedantic": pedantic, } self._socket: socket.socket self._socket_file: io.TextIOWrapper self._socket_options = { "timeout": socket_timeout, "keepalive": socket_keepalive, } self._ssid = 0 self._subs: Dict[int, NATSSubscription] = {} self._nuid = NUID() def __enter__(self) -> "NATSClient": self.connect() return self def __exit__(self, type_, value, traceback) -> None: self.close() def connect(self) -> None: sock = socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM) sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) if self._socket_options["keepalive"]: sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1) sock.settimeout(self._socket_options["timeout"]) sock.connect((self._conn_options["hostname"], self._conn_options["port"])) self._socket_file = sock.makefile("rb") self._socket = sock self._send_connect_command() self._recv(INFO_RE) def close(self) -> None: self._socket_file.close() self._socket.close() def reconnect(self) -> None: self.close() self.connect() def ping(self) -> None: self._send(PING_OP) self._recv(PONG_RE) def subscribe( self, subject: str, *, callback: Callable, queue: str = "", max_messages: Optional[int] = None, ) -> NATSSubscription: sub = NATSSubscription( sid=self._ssid, subject=subject, queue="", callback=callback, max_messages=max_messages, ) self._ssid += 1 self._subs[sub.sid] = sub self._send(SUB_OP, sub.subject, sub.queue, sub.sid) return sub def unsubscribe(self, sub: NATSSubscription) -> None: self._send(UNSUB_OP, sub.sid) self._subs.pop(sub.sid) def auto_unsubscribe(self, sub: NATSSubscription) -> None: if sub.max_messages is None: return self._send(UNSUB_OP, sub.sid, sub.max_messages) def publish(self, subject: str, *, payload: bytes = b"", reply: str = "") -> None: self._send(PUB_OP, subject, reply, len(payload)) self._send(payload) def request(self, subject: str, *, payload: bytes = b"") -> NATSMessage: next_inbox = INBOX_PREFIX[:] next_inbox.extend(self._nuid.next_()) reply_subject = next_inbox.decode() reply_messages: Dict[int, NATSMessage] = {} def callback(message: NATSMessage) -> None: reply_messages[message.sid] = message sub = self.subscribe(reply_subject, callback=callback, max_messages=1) self.auto_unsubscribe(sub) self.publish(subject, payload=payload, reply=reply_subject) self.wait(count=1) return reply_messages[sub.sid] def wait(self, *, count=None) -> None: total = 0 while True: command, result = self._recv(MSG_RE, PING_RE, OK_RE) if command is MSG_RE: self._handle_message(result) total += 1 if count is not None and total >= count: break elif command is PING_RE: self._send(PONG_OP) def _send_connect_command(self) -> None: options = { "name": self._conn_options["name"], "lang": self._conn_options["lang"], "protocol": self._conn_options["protocol"], "version": self._conn_options["version"], "verbose": self._conn_options["verbose"], "pedantic": self._conn_options["pedantic"], } if self._conn_options["username"] and self._conn_options["password"]: options["user"] = self._conn_options["username"] options["pass"] = self._conn_options["password"] elif self._conn_options["username"]: options["auth_token"] = self._conn_options["username"] self._send(CONNECT_OP, json.dumps(options)) def _send(self, *parts: Union[bytes, str, int]) -> None: self._socket.sendall(_SPC_.join(self._encode(p) for p in parts) + _CRLF_) def _encode(self, value: Union[bytes, str, int]) -> bytes: if isinstance(value, bytes): return value elif isinstance(value, str): return value.encode() elif isinstance(value, int): return f"{value:d}".encode() raise RuntimeError(f"got unsupported type for encoding: type={type(value)}") def _recv(self, *commands: Pattern[bytes]) -> Tuple[Pattern[bytes], Match[bytes]]: line = self._readline() command = self._get_command(line) if command not in commands: raise NATSUnexpectedResponse(line) result = command.match(line) if result is None: raise NATSInvalidResponse(line) return command, result def _readline(self, *, size: int = None) -> bytes: read = b"" while True: line = cast(bytes, self._socket_file.readline()) read += line if size is not None: if len(self._strip(read)) == size: break elif line.endswith(_CRLF_): break return read def _strip(self, line: bytes) -> bytes: return line[: -len(_CRLF_)] def _get_command(self, line: bytes) -> Optional[Pattern[bytes]]: values = self._strip(line).split(b" ", 1) return COMMANDS.get(values[0]) def _handle_message(self, result: Match[bytes]) -> None: message_data = result.groupdict() message_payload_size = int(message_data["size"]) message_payload = self._readline(size=message_payload_size) message_payload = self._strip(message_payload) message = NATSMessage( sid=int(message_data["sid"].decode()), subject=message_data["subject"].decode(), reply=message_data["reply"].decode() if message_data["reply"] else "", payload=message_payload, ) sub = self._subs[message.sid] sub.received_messages += 1 if sub.is_wasted(): self._subs.pop(sub.sid) sub.callback(message) PK!pynats/exceptions.py__all__ = ("NATSError", "NATSUnexpectedResponse", "NATSInvalidResponse") class NATSError(Exception): pass class NATSUnexpectedResponse(NATSError): def __init__(self, line: bytes, *args, **kwargs) -> None: self.line = line super().__init__(*args, **kwargs) class NATSInvalidResponse(NATSError): def __init__(self, line: bytes, *args, **kwargs) -> None: self.line = line super().__init__(*args, **kwargs) PK!'pynats/nuid.py# Copyright 2016-2018 The NATS Authors # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # # Code taken from `asyncio-nats` package, for details please follow link below: # https://github.com/nats-io/asyncio-nats/blob/master/nats/aio/nuid.py from random import Random, SystemRandom from sys import maxsize as MaxInt DIGITS = b"0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz" BASE = 62 PREFIX_LENGTH = 12 SEQ_LENGTH = 10 TOTAL_LENGTH = PREFIX_LENGTH + SEQ_LENGTH MAX_SEQ = BASE ** 10 MIN_INC = 33 MAX_INC = 333 INC = MAX_INC - MIN_INC class NUID: """ NUID is an implementation of the approach for fast generation of unique identifiers used for inboxes in NATS. """ def __init__(self) -> None: self._srand = SystemRandom() self._prand = Random(self._srand.randint(0, MaxInt)) self._seq = self._prand.randint(0, MAX_SEQ) self._inc = MIN_INC + self._prand.randint(0, INC) self.randomize_prefix() def next_(self) -> bytearray: self._seq += self._inc if self._seq >= MAX_SEQ: self.randomize_prefix() self.reset_sequential() _seq = self._seq prefix = self._prefix[:] def _next(): nonlocal _seq a = DIGITS[int(_seq) % BASE] _seq /= BASE return a suffix = bytearray(_next() for i in range(SEQ_LENGTH)) prefix.extend(suffix) return prefix def randomize_prefix(self) -> None: random_bytes = (self._srand.getrandbits(8) for i in range(PREFIX_LENGTH)) self._prefix = bytearray(DIGITS[c % BASE] for c in random_bytes) def reset_sequential(self) -> None: self._seq = self._prand.randint(0, MAX_SEQ) self._inc = MIN_INC + self._prand.randint(0, INC) PK!=-//#nats_python-0.3.0.dist-info/LICENSEMIT License Copyright (c) 2018 Nikita Grishko Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. PK!HlŃTT!nats_python-0.3.0.dist-info/WHEEL A н#J@Z|Jmqvh&#hڭw!Ѭ"J˫( } %PK!H5TS$nats_python-0.3.0.dist-info/METADATAV]o6}ׯs6,F-)iКYdݾBIgHʎ}/))H5x.a֭p e?{\Эx8qh]l5fPh j,M͋:D57$ 0o41ޝơn uXE7u ̱'r0 KP?\}PK!HAx\"nats_python-0.3.0.dist-info/RECORD}Ɏ@@ѽ6-fPfPJ(&Qӱ8C!A՞F0d8b͹r0m&Onm rJxxsDVY35 Ժ`O%.S̉/%P7, _vgk/HvKvFn!:>,JR\%1g烞*$ncgF4@q~<43~>6_+K^qPmw}Sar/kA*ld AR,#}SKn"As}=UйeD{;aDG5oܺfk^&}"#4`*;c@PK!P#{{pynats/__init__.pyPK!cy"y"pynats/client.pyPK!R#pynats/exceptions.pyPK!'I%pynats/nuid.pyPK!=-//#h.nats_python-0.3.0.dist-info/LICENSEPK!HlŃTT!2nats_python-0.3.0.dist-info/WHEELPK!H5TS$k3nats_python-0.3.0.dist-info/METADATAPK!HAx\"7nats_python-0.3.0.dist-info/RECORDPK>T9