PK!waspy/__init__.pyfrom .app import Application from .webtypes import Request, Response, QueryParams from .exceptions import ResponseError, NotRoutableError, ParseError from .configuration import Config from .client import Client PK!waspy/__init__.pyfrom .app import Application from .webtypes import Request, Response, QueryParams from .exceptions import ResponseError, NotRoutableError, ParseError from .configuration import Config from .client import Client PK! QR R waspy/_cors.pyfrom http import HTTPStatus from .configuration import ConfigError from .webtypes import Response, Request from .router import Methods class CORSHandler: __slots__ = ('allowed_origins', 'allowed_headers', 'allowed_methods') """ Abstracts CORS things """ def __init__(self, *, allowed_origins: set, allowed_headers: str, allowed_methods: str): if len(allowed_origins) == 1: self.allowed_origins = allowed_origins.pop() else: self.allowed_origins = allowed_origins self.allowed_headers = allowed_headers self.allowed_methods = allowed_methods def add_cors_headers(self, request: Request, response: Response): if isinstance(self.allowed_origins, str): # only one origin result = self.allowed_origins elif request.headers.get('origin') in self.allowed_origins: result = request.headers.get('origin') else: result = 'none' response.headers['Access-Control-Allow-Origin'] = result response.headers['Access-Control-Allow-Credentials'] = 'true' if request.method == Methods.OPTIONS: if self.allowed_headers: response.headers['Access-Control-Allow-Headers'] = \ self.allowed_headers if self.allowed_methods: response.headers['Access-Control-Allow-Methods'] = \ self.allowed_methods async def options_handler(self, request): return Response(status=HTTPStatus.NO_CONTENT) @staticmethod def from_config(config): try: config['cors']['handle'] except ConfigError: return None origins = config['cors']['allowed_origins'] if not origins: # if empty raise ValueError('Must have at least one cors.allowed_origins' ' in your configuration.') origins = {origin.strip() for origin in origins.split(',')} try: headers = config['cors']['allowed_headers'] except ConfigError: headers = '' try: methods = config['cors']['allowed_methods'] except ConfigError: methods = '' return CORSHandler(allowed_origins=origins, allowed_headers=headers, allowed_methods=methods) PK! QR R waspy/_cors.pyfrom http import HTTPStatus from .configuration import ConfigError from .webtypes import Response, Request from .router import Methods class CORSHandler: __slots__ = ('allowed_origins', 'allowed_headers', 'allowed_methods') """ Abstracts CORS things """ def __init__(self, *, allowed_origins: set, allowed_headers: str, allowed_methods: str): if len(allowed_origins) == 1: self.allowed_origins = allowed_origins.pop() else: self.allowed_origins = allowed_origins self.allowed_headers = allowed_headers self.allowed_methods = allowed_methods def add_cors_headers(self, request: Request, response: Response): if isinstance(self.allowed_origins, str): # only one origin result = self.allowed_origins elif request.headers.get('origin') in self.allowed_origins: result = request.headers.get('origin') else: result = 'none' response.headers['Access-Control-Allow-Origin'] = result response.headers['Access-Control-Allow-Credentials'] = 'true' if request.method == Methods.OPTIONS: if self.allowed_headers: response.headers['Access-Control-Allow-Headers'] = \ self.allowed_headers if self.allowed_methods: response.headers['Access-Control-Allow-Methods'] = \ self.allowed_methods async def options_handler(self, request): return Response(status=HTTPStatus.NO_CONTENT) @staticmethod def from_config(config): try: config['cors']['handle'] except ConfigError: return None origins = config['cors']['allowed_origins'] if not origins: # if empty raise ValueError('Must have at least one cors.allowed_origins' ' in your configuration.') origins = {origin.strip() for origin in origins.split(',')} try: headers = config['cors']['allowed_headers'] except ConfigError: headers = '' try: methods = config['cors']['allowed_methods'] except ConfigError: methods = '' return CORSHandler(allowed_origins=origins, allowed_headers=headers, allowed_methods=methods) PK!ea&& waspy/app.pyimport asyncio import logging import signal import sys from functools import wraps from typing import List, Union, Iterable from http import HTTPStatus from concurrent.futures import CancelledError from contextvars import ContextVar, copy_context from copy import copy from waspy.parser import ParserABC, JSONParser, parsers as app_parsers from ._cors import CORSHandler from .client import Client from .webtypes import Request, Response from .exceptions import ResponseError, UnsupportedMediaType from .router import Router from .transports.transportabc import TransportABC from .transports.rabbitmqtransport import NackMePleaseError from .configuration import Config, ConfigError from .ctx import request_context from . import errorlogging logging.basicConfig(format='%(asctime)s %(levelname)s [%(module)s.%(funcName)s] %(message)s') logger = logging.getLogger('waspy') async def response_wrapper_factory(app, handler): @wraps(handler) async def wrap_response_middleware(request): response = await handler(request) if not isinstance(response, Response): if isinstance(response, tuple): body = response[0] status = response[1] response = Response(status=status, body=body) elif isinstance(response, dict) or isinstance(response, str): response = Response(body=response) elif response is None: response = Response(status=HTTPStatus.NO_CONTENT) else: raise ValueError('Request handler returned an invalid type.' ' Return types should be one of ' '[Response, dict, str, None, (dict, int)]') return response return wrap_response_middleware class Application: def __init__(self, transport: Union[TransportABC, Iterable[TransportABC]]=None, *, middlewares: List[callable]=None, default_headers: dict=None, debug: bool=False, router: Router=None, config: Config=None, loop=None, parsers=None, default_content_type='application/json'): if transport is None: from waspy.transports.httptransport import HTTPTransport transport = HTTPTransport() if isinstance(transport, (list, set)): transport = tuple(transport) if not isinstance(transport, tuple): transport = (transport,) if middlewares is None: middlewares = () middlewares = tuple(m for m in middlewares) middlewares += (response_wrapper_factory,) if router is None: router = Router() if default_headers is None: default_headers = {'Server': 'waspy'} if not config: config = Config() # Parser management if not parsers: parsers = [JSONParser()] for parser in parsers: self.add_parser(parser) self.transport = transport self.middlewares = middlewares self.default_headers = default_headers self.debug = debug self.router = router self.on_start = [] self.on_stop = [] self._client = None self.config = config self.raven = None self.logger = None self._cors_handler = None self.loop = loop self.default_content_type = default_content_type @property def client(self) -> Client: if not self._client: self._client = Client(transport=self.transport[0].get_client()) return self._client def add_parser(self, parser: ParserABC): app_parsers[parser.content_type] = parser def start_shutdown(self, signum=None, frame=None): # loop = asyncio.get_event_loop() for t in self.transport: t.shutdown() def run(self): if not self.loop: self.loop = asyncio.get_event_loop() loop = self.loop if self.config['debug']: logger.setLevel('DEBUG') self.loop.set_debug(True) # init logger self._create_logger() # add cors support if needed self._cors_handler = CORSHandler.from_config(self.config) if self._cors_handler: self.router.add_generic_options_handler(self._cors_handler.options_handler) # wrap handlers in middleware loop.run_until_complete(self._wrap_handlers()) for t in self.transport: t.listen(loop=loop, config=self.config) # Call on-startup hooks loop.run_until_complete(self.run_on_start_hooks()) # todo: fork/add processes? tasks = [] for t in self.transport: tasks.append(t.start(self.handle_request)) # register signals, so that stopping the service works correctly loop.add_signal_handler(signal.SIGTERM, self.start_shutdown) loop.add_signal_handler(signal.SIGINT, self.start_shutdown) # Run all transports - they shouldn't return until shutdown loop.run_until_complete(asyncio.gather(*tasks)) self.shutdown() async def run_on_start_hooks(self): """ Run all hooks in on_start. Allows for coroutines and synchronous functions. """ logger.debug("Running on start hooks") await self._run_hooks(self.on_start) async def run_on_stop_hooks(self): """ Run all hooks in on_stop. Allows for coroutines and synchronous functions. """ logger.debug("Running on stop hooks") await self._run_hooks(self.on_stop) async def handle_request(self, request: Request) -> Response: """ coroutine: This method is called by Transport implementation to handle the actual request. It returns a webtype.Response object. """ # Get handler try: try: self._set_ctx(request) handler = self.router.get_handler_for_request(request) request.app = self response = await handler(request) response.app = self except ResponseError as r: parser = app_parsers.get(request.content_type, None) # Content-Type of an error response will be the same as the incoming request # unless a parser for that content type is not found. if not parser: content_type = r.content_type if not content_type: content_type = self.default_content_type else: content_type = request.content_type response = Response( headers=r.headers, correlation_id=r.correlation_id, body=r.body, status=r.status, content_type=content_type ) response.app = self if r.log: exc_info = sys.exc_info() self.logger.log_exception(request, exc_info, level='warning') # invoke serialization (json) to make sure it works _ = response.body except CancelledError: # This error can happen if a client closes the connection # The response shouldnt really ever be used return None except asyncio.TimeoutError: response = Response(status=HTTPStatus.GATEWAY_TIMEOUT, body={'message': 'Gateway Timeout'}) response.app = self except NackMePleaseError: """ See message where this error is defined """ raise except Exception: exc_info = sys.exc_info() self.logger.log_exception(request, exc_info) response = Response(status=HTTPStatus.INTERNAL_SERVER_ERROR, body={'message': 'Server Error'}) response.app = self if not response.correlation_id: response.correlation_id = request.correlation_id if self._cors_handler is not None: self._cors_handler.add_cors_headers(request, response) # add default headers response.headers = {**self.default_headers, **response.headers} return response def _set_ctx(self, request): ctx = {'correlation_id': request.correlation_id, 'ctx_headers': {k: v for k, v in request.headers.items() if k.startswith('x-ctx-')}} request_context.set(ctx) async def _wrap_handlers(self): handler_gen = self.router._get_and_wrap_routes() try: handler = next(handler_gen) while True: wrapped = handler for middleware in self.middlewares[::-1]: wrapped = await middleware(self, wrapped) handler = handler_gen.send(wrapped) except StopIteration: pass def _create_logger(self): try: dsn = self.config['sentry']['dsn'] except (ConfigError, ValueError): self.logger = errorlogging.ErrorLoggingBase() else: try: env = self.config['app_env'] except (ConfigError): env = 'waspy' self.logger = errorlogging.SentryLogging( dsn=dsn, environment=env ) async def _run_hooks(self, hooks): coros = [] while len(hooks): task = hooks.pop() if asyncio.iscoroutinefunction(task): coros.append(task(self)) else: task(self) await asyncio.gather(*coros) def shutdown(self): self.loop.run_until_complete(self.run_on_stop_hooks()) self.loop.close() PK!ea&& waspy/app.pyimport asyncio import logging import signal import sys from functools import wraps from typing import List, Union, Iterable from http import HTTPStatus from concurrent.futures import CancelledError from contextvars import ContextVar, copy_context from copy import copy from waspy.parser import ParserABC, JSONParser, parsers as app_parsers from ._cors import CORSHandler from .client import Client from .webtypes import Request, Response from .exceptions import ResponseError, UnsupportedMediaType from .router import Router from .transports.transportabc import TransportABC from .transports.rabbitmqtransport import NackMePleaseError from .configuration import Config, ConfigError from .ctx import request_context from . import errorlogging logging.basicConfig(format='%(asctime)s %(levelname)s [%(module)s.%(funcName)s] %(message)s') logger = logging.getLogger('waspy') async def response_wrapper_factory(app, handler): @wraps(handler) async def wrap_response_middleware(request): response = await handler(request) if not isinstance(response, Response): if isinstance(response, tuple): body = response[0] status = response[1] response = Response(status=status, body=body) elif isinstance(response, dict) or isinstance(response, str): response = Response(body=response) elif response is None: response = Response(status=HTTPStatus.NO_CONTENT) else: raise ValueError('Request handler returned an invalid type.' ' Return types should be one of ' '[Response, dict, str, None, (dict, int)]') return response return wrap_response_middleware class Application: def __init__(self, transport: Union[TransportABC, Iterable[TransportABC]]=None, *, middlewares: List[callable]=None, default_headers: dict=None, debug: bool=False, router: Router=None, config: Config=None, loop=None, parsers=None, default_content_type='application/json'): if transport is None: from waspy.transports.httptransport import HTTPTransport transport = HTTPTransport() if isinstance(transport, (list, set)): transport = tuple(transport) if not isinstance(transport, tuple): transport = (transport,) if middlewares is None: middlewares = () middlewares = tuple(m for m in middlewares) middlewares += (response_wrapper_factory,) if router is None: router = Router() if default_headers is None: default_headers = {'Server': 'waspy'} if not config: config = Config() # Parser management if not parsers: parsers = [JSONParser()] for parser in parsers: self.add_parser(parser) self.transport = transport self.middlewares = middlewares self.default_headers = default_headers self.debug = debug self.router = router self.on_start = [] self.on_stop = [] self._client = None self.config = config self.raven = None self.logger = None self._cors_handler = None self.loop = loop self.default_content_type = default_content_type @property def client(self) -> Client: if not self._client: self._client = Client(transport=self.transport[0].get_client()) return self._client def add_parser(self, parser: ParserABC): app_parsers[parser.content_type] = parser def start_shutdown(self, signum=None, frame=None): # loop = asyncio.get_event_loop() for t in self.transport: t.shutdown() def run(self): if not self.loop: self.loop = asyncio.get_event_loop() loop = self.loop if self.config['debug']: logger.setLevel('DEBUG') self.loop.set_debug(True) # init logger self._create_logger() # add cors support if needed self._cors_handler = CORSHandler.from_config(self.config) if self._cors_handler: self.router.add_generic_options_handler(self._cors_handler.options_handler) # wrap handlers in middleware loop.run_until_complete(self._wrap_handlers()) for t in self.transport: t.listen(loop=loop, config=self.config) # Call on-startup hooks loop.run_until_complete(self.run_on_start_hooks()) # todo: fork/add processes? tasks = [] for t in self.transport: tasks.append(t.start(self.handle_request)) # register signals, so that stopping the service works correctly loop.add_signal_handler(signal.SIGTERM, self.start_shutdown) loop.add_signal_handler(signal.SIGINT, self.start_shutdown) # Run all transports - they shouldn't return until shutdown loop.run_until_complete(asyncio.gather(*tasks)) self.shutdown() async def run_on_start_hooks(self): """ Run all hooks in on_start. Allows for coroutines and synchronous functions. """ logger.debug("Running on start hooks") await self._run_hooks(self.on_start) async def run_on_stop_hooks(self): """ Run all hooks in on_stop. Allows for coroutines and synchronous functions. """ logger.debug("Running on stop hooks") await self._run_hooks(self.on_stop) async def handle_request(self, request: Request) -> Response: """ coroutine: This method is called by Transport implementation to handle the actual request. It returns a webtype.Response object. """ # Get handler try: try: self._set_ctx(request) handler = self.router.get_handler_for_request(request) request.app = self response = await handler(request) response.app = self except ResponseError as r: parser = app_parsers.get(request.content_type, None) # Content-Type of an error response will be the same as the incoming request # unless a parser for that content type is not found. if not parser: content_type = r.content_type if not content_type: content_type = self.default_content_type else: content_type = request.content_type response = Response( headers=r.headers, correlation_id=r.correlation_id, body=r.body, status=r.status, content_type=content_type ) response.app = self if r.log: exc_info = sys.exc_info() self.logger.log_exception(request, exc_info, level='warning') # invoke serialization (json) to make sure it works _ = response.body except CancelledError: # This error can happen if a client closes the connection # The response shouldnt really ever be used return None except asyncio.TimeoutError: response = Response(status=HTTPStatus.GATEWAY_TIMEOUT, body={'message': 'Gateway Timeout'}) response.app = self except NackMePleaseError: """ See message where this error is defined """ raise except Exception: exc_info = sys.exc_info() self.logger.log_exception(request, exc_info) response = Response(status=HTTPStatus.INTERNAL_SERVER_ERROR, body={'message': 'Server Error'}) response.app = self if not response.correlation_id: response.correlation_id = request.correlation_id if self._cors_handler is not None: self._cors_handler.add_cors_headers(request, response) # add default headers response.headers = {**self.default_headers, **response.headers} return response def _set_ctx(self, request): ctx = {'correlation_id': request.correlation_id, 'ctx_headers': {k: v for k, v in request.headers.items() if k.startswith('x-ctx-')}} request_context.set(ctx) async def _wrap_handlers(self): handler_gen = self.router._get_and_wrap_routes() try: handler = next(handler_gen) while True: wrapped = handler for middleware in self.middlewares[::-1]: wrapped = await middleware(self, wrapped) handler = handler_gen.send(wrapped) except StopIteration: pass def _create_logger(self): try: dsn = self.config['sentry']['dsn'] except (ConfigError, ValueError): self.logger = errorlogging.ErrorLoggingBase() else: try: env = self.config['app_env'] except (ConfigError): env = 'waspy' self.logger = errorlogging.SentryLogging( dsn=dsn, environment=env ) async def _run_hooks(self, hooks): coros = [] while len(hooks): task = hooks.pop() if asyncio.iscoroutinefunction(task): coros.append(task(self)) else: task(self) await asyncio.gather(*coros) def shutdown(self): self.loop.run_until_complete(self.run_on_stop_hooks()) self.loop.close() PK!Nwaspy/client.pyimport json from urllib import parse import uuid import warnings import asyncio from .webtypes import QueryParams, Request, Methods from .ctx import request_context class Client: """ Generic Client class for making a wasp client """ __slots__ = ('transport',) def __init__(self, transport=None, **kwargs): if not transport: from waspy.transports import HTTPClientTransport transport = HTTPClientTransport(**kwargs) self.transport = transport def make_request(self, method, service, path, body=None, query_params: QueryParams=None, headers: dict=None, correlation_id: str=None, content_type: str='application/json', context: Request=None, timeout=30, **kwargs) -> asyncio.coroutine: """ Make a request to another service. If `context` is provided, then context and correlation will be pulled from the provided request object for you. This includes credentials, correlationid, and service-headers. :param method: GET/PUT/PATCH, etc. :param service: name of service :param path: request object path :param body: body of request :param query_params: :param headers: :param correlation_id: :param content_type: :param context: A request object from which a "child-request" will be made :param timeout: Time in seconds the client will wait befor raising an asyncio.TimeoutError :param kwargs: Just a place holder so transport specific options can be passed through :return: """ if not isinstance(method, Methods): method = Methods(method.upper()) if content_type == 'application/json' and isinstance(body, dict): body = json.dumps(body) if isinstance(query_params, dict): query_string = parse.urlencode(query_params) elif isinstance(query_params, QueryParams): query_string = str(query_params) else: query_string = '' headers = headers or {} ctx = request_context.get() if context: warnings.warn("Passing in a context to waspy client is deprecated. " "Passed in context will be ignored", DeprecationWarning) if not correlation_id: correlation_id = ctx['correlation_id'] headers = {**headers, **ctx['ctx_headers']} exchange = headers.get('x-ctx-exchange-override', None) if exchange: kwargs['exchange'] = exchange if isinstance(body, str): body = body.encode() response = asyncio.wait_for( self.transport.make_request( service, method.name, path, body=body, query=query_string, headers=headers, correlation_id=correlation_id, content_type=content_type, timeout=timeout, **kwargs), timeout=timeout) return response # response is a coroutine that must be awaited def get(self, service, path, **kwargs): """ Make a get request (this returns a coroutine)""" return self.make_request(Methods.GET, service, path, **kwargs) def post(self, service, path, body, **kwargs): """ Make a post request (this returns a coroutine)""" return self.make_request(Methods.POST, service, path, body=body, **kwargs) def put(self, service, path, body, **kwargs): """ Make a put request (this returns a coroutine)""" return self.make_request(Methods.POST, service, path, body=body, **kwargs) def patch(self, service, path, body, **kwargs): """ Make a patche requests (this returns a coroutine)""" return self.make_request(Methods.PATCH, service, path, body=body, **kwargs) def delete(self, service, path, **kwargs): """ Make a delete requests (this returns a coroutine)""" return self.make_request(Methods.DELETE, service, path, **kwargs) PK!Nwaspy/client.pyimport json from urllib import parse import uuid import warnings import asyncio from .webtypes import QueryParams, Request, Methods from .ctx import request_context class Client: """ Generic Client class for making a wasp client """ __slots__ = ('transport',) def __init__(self, transport=None, **kwargs): if not transport: from waspy.transports import HTTPClientTransport transport = HTTPClientTransport(**kwargs) self.transport = transport def make_request(self, method, service, path, body=None, query_params: QueryParams=None, headers: dict=None, correlation_id: str=None, content_type: str='application/json', context: Request=None, timeout=30, **kwargs) -> asyncio.coroutine: """ Make a request to another service. If `context` is provided, then context and correlation will be pulled from the provided request object for you. This includes credentials, correlationid, and service-headers. :param method: GET/PUT/PATCH, etc. :param service: name of service :param path: request object path :param body: body of request :param query_params: :param headers: :param correlation_id: :param content_type: :param context: A request object from which a "child-request" will be made :param timeout: Time in seconds the client will wait befor raising an asyncio.TimeoutError :param kwargs: Just a place holder so transport specific options can be passed through :return: """ if not isinstance(method, Methods): method = Methods(method.upper()) if content_type == 'application/json' and isinstance(body, dict): body = json.dumps(body) if isinstance(query_params, dict): query_string = parse.urlencode(query_params) elif isinstance(query_params, QueryParams): query_string = str(query_params) else: query_string = '' headers = headers or {} ctx = request_context.get() if context: warnings.warn("Passing in a context to waspy client is deprecated. " "Passed in context will be ignored", DeprecationWarning) if not correlation_id: correlation_id = ctx['correlation_id'] headers = {**headers, **ctx['ctx_headers']} exchange = headers.get('x-ctx-exchange-override', None) if exchange: kwargs['exchange'] = exchange if isinstance(body, str): body = body.encode() response = asyncio.wait_for( self.transport.make_request( service, method.name, path, body=body, query=query_string, headers=headers, correlation_id=correlation_id, content_type=content_type, timeout=timeout, **kwargs), timeout=timeout) return response # response is a coroutine that must be awaited def get(self, service, path, **kwargs): """ Make a get request (this returns a coroutine)""" return self.make_request(Methods.GET, service, path, **kwargs) def post(self, service, path, body, **kwargs): """ Make a post request (this returns a coroutine)""" return self.make_request(Methods.POST, service, path, body=body, **kwargs) def put(self, service, path, body, **kwargs): """ Make a put request (this returns a coroutine)""" return self.make_request(Methods.POST, service, path, body=body, **kwargs) def patch(self, service, path, body, **kwargs): """ Make a patche requests (this returns a coroutine)""" return self.make_request(Methods.PATCH, service, path, body=body, **kwargs) def delete(self, service, path, **kwargs): """ Make a delete requests (this returns a coroutine)""" return self.make_request(Methods.DELETE, service, path, **kwargs) PK!wwaspy/configuration.pyimport os import yaml CONFIG_LOCATION = os.getenv('WASPY_CONFIG_LOCATION') class ConfigError(KeyError): """ Raised when a requested configuration can not be found """ def __init__(self, config_name, env_var): super().__init__("""\ No configuration found for "{}". \ Add the environment variable {} or add \ the key to your config.yaml file.\ """.format(config_name, env_var)) NO_CONFIG_LOCATION_ERROR_MESSAGE = """ No config file specified. \ You can use `app.config.from_file(file_location)` or set a location using \ the environment variable "WASPY_CONFIG_LOCATION".\ """ CONFIG_NOT_YET_LOADED_ERROR_MESSAGE = """ Configuration file not yet loaded. \ You should use `[app.]config.from_file(location)` or `[app.]config.load` \ before trying to access a configuration value.\ """ class Config: """ load configuration from envvar or config file You can get settings from environment variables or a config.yaml file Environment variables have a higher precedence over the config file Use `config['section']['subsection']['key']` to get the value SECTION_SUBSECTION_KEY from environment variables or section.subsection.key from a yaml file (usually written: [section.subsection] key = value in a yaml file) """ def __init__(self, _basename=None, _defaults=None): self.default_options = _defaults self.basename = _basename def from_file(self, location): self._load_config(filepath=location) return self def load(self): if self.default_options is None: self._load_config() def _load_config(self, filepath=None): if filepath is None: if CONFIG_LOCATION is None: raise ValueError(NO_CONFIG_LOCATION_ERROR_MESSAGE) filepath = os.path.abspath(CONFIG_LOCATION) with open(filepath, 'r') as f: config = yaml.safe_load(f) self.default_options = config def __getitem__(self, item): if self.default_options is None: raise ValueError(CONFIG_NOT_YET_LOADED_ERROR_MESSAGE) default = self.default_options.get(item) if isinstance(default, dict): return Config(_basename=self._create_basename(item), _defaults=default) env = self._get_env_var(item) if env is None: if default is None: raise ConfigError(self._create_basename(item), self._create_env_var_string(item)) return default return env def __contains__(self, item): try: self.__getitem__(item) return True except ConfigError: return False def _create_basename(self, item): if self.basename: return self.basename + '.' + item else: return item def _create_env_var_string(self, item): if not self.basename: return item.upper() else: return '_'.join(self._create_basename(item).split('.')).upper() def _get_env_var(self, item): envvar_string = self._create_env_var_string(item) env = os.getenv(envvar_string) if env is not None: if isinstance(env, str): # env vars are always passed in as strings in docker world # here we will try to convert them to basic types if we can if env.lower() == 'true': return True if env.lower() == 'false': return False try: env = int(env) except ValueError: pass return env PK!wwaspy/configuration.pyimport os import yaml CONFIG_LOCATION = os.getenv('WASPY_CONFIG_LOCATION') class ConfigError(KeyError): """ Raised when a requested configuration can not be found """ def __init__(self, config_name, env_var): super().__init__("""\ No configuration found for "{}". \ Add the environment variable {} or add \ the key to your config.yaml file.\ """.format(config_name, env_var)) NO_CONFIG_LOCATION_ERROR_MESSAGE = """ No config file specified. \ You can use `app.config.from_file(file_location)` or set a location using \ the environment variable "WASPY_CONFIG_LOCATION".\ """ CONFIG_NOT_YET_LOADED_ERROR_MESSAGE = """ Configuration file not yet loaded. \ You should use `[app.]config.from_file(location)` or `[app.]config.load` \ before trying to access a configuration value.\ """ class Config: """ load configuration from envvar or config file You can get settings from environment variables or a config.yaml file Environment variables have a higher precedence over the config file Use `config['section']['subsection']['key']` to get the value SECTION_SUBSECTION_KEY from environment variables or section.subsection.key from a yaml file (usually written: [section.subsection] key = value in a yaml file) """ def __init__(self, _basename=None, _defaults=None): self.default_options = _defaults self.basename = _basename def from_file(self, location): self._load_config(filepath=location) return self def load(self): if self.default_options is None: self._load_config() def _load_config(self, filepath=None): if filepath is None: if CONFIG_LOCATION is None: raise ValueError(NO_CONFIG_LOCATION_ERROR_MESSAGE) filepath = os.path.abspath(CONFIG_LOCATION) with open(filepath, 'r') as f: config = yaml.safe_load(f) self.default_options = config def __getitem__(self, item): if self.default_options is None: raise ValueError(CONFIG_NOT_YET_LOADED_ERROR_MESSAGE) default = self.default_options.get(item) if isinstance(default, dict): return Config(_basename=self._create_basename(item), _defaults=default) env = self._get_env_var(item) if env is None: if default is None: raise ConfigError(self._create_basename(item), self._create_env_var_string(item)) return default return env def __contains__(self, item): try: self.__getitem__(item) return True except ConfigError: return False def _create_basename(self, item): if self.basename: return self.basename + '.' + item else: return item def _create_env_var_string(self, item): if not self.basename: return item.upper() else: return '_'.join(self._create_basename(item).split('.')).upper() def _get_env_var(self, item): envvar_string = self._create_env_var_string(item) env = os.getenv(envvar_string) if env is not None: if isinstance(env, str): # env vars are always passed in as strings in docker world # here we will try to convert them to basic types if we can if env.lower() == 'true': return True if env.lower() == 'false': return False try: env = int(env) except ValueError: pass return env PK!TT waspy/ctx.pyfrom contextvars import ContextVar request_context = ContextVar('request_context') PK!TT waspy/ctx.pyfrom contextvars import ContextVar request_context = ContextVar('request_context') PK!HNN N waspy/errorlogging.pyimport sys import logging from waspy.exceptions import UnsupportedMediaType logger = logging.getLogger('waspy') class ErrorLoggingBase: def log_exception(self, request, exc_info, level='error'): try: level = getattr(logging, level.upper()) except: level = logging.ERROR logger.log(level, 'An error occurred while handling request: {}' .format(request), exc_info=exc_info) def log_warning(self, request, message): logger.warning(message) class SentryLogging(ErrorLoggingBase): def __init__(self, *, dsn, environment): try: from raven import Client except ImportError as e: raise ImportWarning( 'You must install raven in order to use the sentry logger' ) from e self.raven = Client(dsn, environment=environment) self.raven.transaction.clear() def _get_sentry_details(self, request, exc_info): data = { 'request': { 'method': request.method.value, 'data': request.body if request.original_body else None, 'query_string': request.query_string, 'url': '/' + request.path.replace('.', '/'), 'content-type': request.content_type, 'headers': request.headers }, 'user': { } } tags = { # put things you want to filter by in sentry here 'handler': request._handler.__qualname__, } extra_data = { # put extra data here 'correlation_id': request.correlation_id } self.add_context_data(request, data, tags, extra_data, exc_info) return data, tags, extra_data def log_warning(self, request, message=None): exc_info = sys.exc_info() super().log_warning(request, message=message) data, tags, extra = self._get_sentry_details(request, None) self.raven.captureException(exc_info=exc_info, message=message, data=data, extra=extra, tags=tags, level='warning') def log_exception(self, request, exc_info, level='error'): super().log_exception(request, exc_info, level=level) data, tags, extra = self._get_sentry_details(request, exc_info) self.raven.captureException(data=data, extra=extra, tags=tags, exc_info=exc_info, level=level) def add_context_data(self, request, data, tags, extra_data, exc_info): """ Override this method to add your own sentry data. :param request: the request the error occured on :param data: This is the normal sentry data, prepopulated with the request data for you. Dictionary. :param tags: things you want to filter by in sentry. Dictionary :param extra_data: extra informational data. Dictionary :param exc_info: the exception info tuple provide from sys.exc_info() :return: Nothing. Just modify the passed in dictionaries """ pass PK!HNN N waspy/errorlogging.pyimport sys import logging from waspy.exceptions import UnsupportedMediaType logger = logging.getLogger('waspy') class ErrorLoggingBase: def log_exception(self, request, exc_info, level='error'): try: level = getattr(logging, level.upper()) except: level = logging.ERROR logger.log(level, 'An error occurred while handling request: {}' .format(request), exc_info=exc_info) def log_warning(self, request, message): logger.warning(message) class SentryLogging(ErrorLoggingBase): def __init__(self, *, dsn, environment): try: from raven import Client except ImportError as e: raise ImportWarning( 'You must install raven in order to use the sentry logger' ) from e self.raven = Client(dsn, environment=environment) self.raven.transaction.clear() def _get_sentry_details(self, request, exc_info): data = { 'request': { 'method': request.method.value, 'data': request.body if request.original_body else None, 'query_string': request.query_string, 'url': '/' + request.path.replace('.', '/'), 'content-type': request.content_type, 'headers': request.headers }, 'user': { } } tags = { # put things you want to filter by in sentry here 'handler': request._handler.__qualname__, } extra_data = { # put extra data here 'correlation_id': request.correlation_id } self.add_context_data(request, data, tags, extra_data, exc_info) return data, tags, extra_data def log_warning(self, request, message=None): exc_info = sys.exc_info() super().log_warning(request, message=message) data, tags, extra = self._get_sentry_details(request, None) self.raven.captureException(exc_info=exc_info, message=message, data=data, extra=extra, tags=tags, level='warning') def log_exception(self, request, exc_info, level='error'): super().log_exception(request, exc_info, level=level) data, tags, extra = self._get_sentry_details(request, exc_info) self.raven.captureException(data=data, extra=extra, tags=tags, exc_info=exc_info, level=level) def add_context_data(self, request, data, tags, extra_data, exc_info): """ Override this method to add your own sentry data. :param request: the request the error occured on :param data: This is the normal sentry data, prepopulated with the request data for you. Dictionary. :param tags: things you want to filter by in sentry. Dictionary :param extra_data: extra informational data. Dictionary :param exc_info: the exception info tuple provide from sys.exc_info() :return: Nothing. Just modify the passed in dictionaries """ pass PK!V+~~waspy/exceptions.pyfrom http import HTTPStatus class ResponseError(Exception): def __init__(self, message=None, status: HTTPStatus=None, *, body=None, headers=None, correlation_id=None, reason=None, content_type=None, log=False): super().__init__(message) self.message = message if hasattr(self, 'status') and status is None: status = self.status if hasattr(self, 'body') and body is None: body = self.body if hasattr(self, 'reason') and reason is None: reason = self.reason if hasattr(self, 'log') and log is False: log = self.log if hasattr(self, 'content_type') and content_type is None: content_type = self.content_type if reason and not body: body = {'reason': reason} self.status = status self.body = body self.log = log self.headers = headers self.correlation_id = correlation_id self.reason = reason self.content_type = content_type class ParseError(ResponseError): status = HTTPStatus.BAD_REQUEST def __init__(self, reason): """ Example raise ParseError("Invalid JSON") """ super().__init__(message=reason, reason=reason) class UnsupportedMediaType(ResponseError): status = HTTPStatus.UNSUPPORTED_MEDIA_TYPE def __init__(self, content_type): super().__init__( message=content_type, reason=f'Unsupported media type "{content_type}" in request' ) class NotRoutableError(ResponseError): status = HTTPStatus.NOT_FOUND reason = 'No route found' PK!V+~~waspy/exceptions.pyfrom http import HTTPStatus class ResponseError(Exception): def __init__(self, message=None, status: HTTPStatus=None, *, body=None, headers=None, correlation_id=None, reason=None, content_type=None, log=False): super().__init__(message) self.message = message if hasattr(self, 'status') and status is None: status = self.status if hasattr(self, 'body') and body is None: body = self.body if hasattr(self, 'reason') and reason is None: reason = self.reason if hasattr(self, 'log') and log is False: log = self.log if hasattr(self, 'content_type') and content_type is None: content_type = self.content_type if reason and not body: body = {'reason': reason} self.status = status self.body = body self.log = log self.headers = headers self.correlation_id = correlation_id self.reason = reason self.content_type = content_type class ParseError(ResponseError): status = HTTPStatus.BAD_REQUEST def __init__(self, reason): """ Example raise ParseError("Invalid JSON") """ super().__init__(message=reason, reason=reason) class UnsupportedMediaType(ResponseError): status = HTTPStatus.UNSUPPORTED_MEDIA_TYPE def __init__(self, content_type): super().__init__( message=content_type, reason=f'Unsupported media type "{content_type}" in request' ) class NotRoutableError(ResponseError): status = HTTPStatus.NOT_FOUND reason = 'No route found' PK!waspy/listeners/__init__.pyPK!waspy/listeners/__init__.pyPK!ZՍn n $waspy/listeners/rabbitmq_listener.pyimport json import aioamqp from waspy.listeners.transport_listener_abc import TransportListenerABC from waspy.transports.rabbitmqtransport import RabbitMQTransport class RabbitMQTransportListener(TransportListenerABC): queue = '' exchange = '' routing_key = '' exchange_type = 'topic' declare_exchange = True declare_queue = True durable = True auto_delete = False exclusive = False no_wait = False prefetch_count = 1 use_acks = True nack_on_error = True requeue_nacks = True json_payload = True def __init__(self, ): self.transport = None self.channel: aioamqp.channel.Channel = None self._consumer_tag = None self._bootstrapped = False async def set_channel(self, channel): self._bootstrapped = False if self.channel and self.channel.is_open: await self.transport.close_channel(self.channel) self.channel = channel await self._bootstrap_channel() def set_transport(self, transport: RabbitMQTransport): if not isinstance(transport, RabbitMQTransport): raise TypeError( "Invalid transport received. "\ f"Expected {type(RabbitMQTransport)}, got type{transport}" ) self.transport = transport async def start(self): if not self.channel: self.channel = await self.transport.create_channel() await self._bootstrap_channel() resp = await self.channel.basic_consume( self._handle_work, queue_name=self.queue, no_ack=not self.use_acks ) self._consumer_tag = resp.get('consumer_tag') async def _handle_work(self, _, body, envelope, properties): if self.json_payload: body = json.loads(body) try: await self.handle_work(body, evelope=envelope, properties=properties) except Exception as e: if self.nack_on_error and self.use_acks: await self.channel.basic_client_nack(envelope.delivery_tag, requeue=self.requeue_nacks) raise e else: if self.use_acks: await self.channel.basic_client_ack(envelope.delivery_tag) async def exchange_declare(self): """ Override this method to change how a exchange is declared """ await self.channel.exchange_declare( self.exchange, self.exchange_type, durable=self.durable, auto_delete=self.auto_delete, no_wait=self.no_wait, ) async def queue_declare(self): """ Override this method to change how a queue is declared """ await self.channel.queue_declare( self.queue, durable=self.durable, exclusive=self.exclusive, no_wait=self.no_wait ) async def _bootstrap_channel(self): if self._bootstrapped: return self._bootstrapped = True await self.channel.basic_qos(prefetch_count=self.prefetch_count) if self.declare_queue: await self.queue_declare() if self.exchange: if self.declare_exchange: await self.exchange_declare() await self.channel.queue_bind( self.queue, self.exchange, self.routing_key, ) PK!ZՍn n $waspy/listeners/rabbitmq_listener.pyimport json import aioamqp from waspy.listeners.transport_listener_abc import TransportListenerABC from waspy.transports.rabbitmqtransport import RabbitMQTransport class RabbitMQTransportListener(TransportListenerABC): queue = '' exchange = '' routing_key = '' exchange_type = 'topic' declare_exchange = True declare_queue = True durable = True auto_delete = False exclusive = False no_wait = False prefetch_count = 1 use_acks = True nack_on_error = True requeue_nacks = True json_payload = True def __init__(self, ): self.transport = None self.channel: aioamqp.channel.Channel = None self._consumer_tag = None self._bootstrapped = False async def set_channel(self, channel): self._bootstrapped = False if self.channel and self.channel.is_open: await self.transport.close_channel(self.channel) self.channel = channel await self._bootstrap_channel() def set_transport(self, transport: RabbitMQTransport): if not isinstance(transport, RabbitMQTransport): raise TypeError( "Invalid transport received. "\ f"Expected {type(RabbitMQTransport)}, got type{transport}" ) self.transport = transport async def start(self): if not self.channel: self.channel = await self.transport.create_channel() await self._bootstrap_channel() resp = await self.channel.basic_consume( self._handle_work, queue_name=self.queue, no_ack=not self.use_acks ) self._consumer_tag = resp.get('consumer_tag') async def _handle_work(self, _, body, envelope, properties): if self.json_payload: body = json.loads(body) try: await self.handle_work(body, evelope=envelope, properties=properties) except Exception as e: if self.nack_on_error and self.use_acks: await self.channel.basic_client_nack(envelope.delivery_tag, requeue=self.requeue_nacks) raise e else: if self.use_acks: await self.channel.basic_client_ack(envelope.delivery_tag) async def exchange_declare(self): """ Override this method to change how a exchange is declared """ await self.channel.exchange_declare( self.exchange, self.exchange_type, durable=self.durable, auto_delete=self.auto_delete, no_wait=self.no_wait, ) async def queue_declare(self): """ Override this method to change how a queue is declared """ await self.channel.queue_declare( self.queue, durable=self.durable, exclusive=self.exclusive, no_wait=self.no_wait ) async def _bootstrap_channel(self): if self._bootstrapped: return self._bootstrapped = True await self.channel.basic_qos(prefetch_count=self.prefetch_count) if self.declare_queue: await self.queue_declare() if self.exchange: if self.declare_exchange: await self.exchange_declare() await self.channel.queue_bind( self.queue, self.exchange, self.routing_key, ) PK!)waspy/listeners/transport_listener_abc.pyfrom abc import ABC, abstractmethod from typing import Dict class TransportListenerABC(ABC): @abstractmethod async def start(self) -> None: pass @abstractmethod async def handle_work(self, data: Dict, **kwargs) -> None: pass PK!)waspy/listeners/transport_listener_abc.pyfrom abc import ABC, abstractmethod from typing import Dict class TransportListenerABC(ABC): @abstractmethod async def start(self) -> None: pass @abstractmethod async def handle_work(self, data: Dict, **kwargs) -> None: pass PK!xi""waspy/parser.pyfrom abc import ABC, abstractmethod import json from waspy.exceptions import ParseError class ParserABC(ABC): """ Abstract Base Class for implementing encoding codecs """ @property @abstractmethod def content_type(self) -> str: return '' @abstractmethod def encode(self, data) -> bytes: pass @abstractmethod def decode(self, data: bytes): pass class JSONParser(ParserABC): """ The default parser for waspy. """ content_type = 'application/json' def encode(self, data) -> bytes: return json.dumps(data) def decode(self, data: bytes): if data: try: return json.loads(data) except json.JSONDecodeError: raise ParseError("Invalid JSON") parsers = {} PK!xi""waspy/parser.pyfrom abc import ABC, abstractmethod import json from waspy.exceptions import ParseError class ParserABC(ABC): """ Abstract Base Class for implementing encoding codecs """ @property @abstractmethod def content_type(self) -> str: return '' @abstractmethod def encode(self, data) -> bytes: pass @abstractmethod def decode(self, data: bytes): pass class JSONParser(ParserABC): """ The default parser for waspy. """ content_type = 'application/json' def encode(self, data) -> bytes: return json.dumps(data) def decode(self, data: bytes): if data: try: return json.loads(data) except json.JSONDecodeError: raise ParseError("Invalid JSON") parsers = {} PK!l!!waspy/router.pyimport warnings from contextlib import contextmanager from http import HTTPStatus from typing import Callable, Union from enum import Enum from .exceptions import ResponseError """ The below constant is special key in the router dictionary that determines an id section of the url. The / character is the only character that can't possibly be used in a url path or id, since it is the path delimiter. Therefore, there are '/' characters in the key, so that it can never accidentally be overridden. p.s. '_id' is to make the dictionary slightly more readable when debugging """ ID_KEY = '/_id/' class NotAValidURLError(Exception): """ When a url path syntax is not valid """ class Methods(Enum): GET = 'GET' POST = 'POST' PUT = 'PUT' DELETE = 'DELETE' OPTIONS = 'OPTIONS' PATCH = 'PATCH' HEAD = 'HEAD' PUBLISH = 'PUBLISH' # NOT VALID HTTP METHOD async def _send_404(request): raise ResponseError(status=HTTPStatus.NOT_FOUND) async def _send_405(request): raise ResponseError(status=HTTPStatus.METHOD_NOT_ALLOWED) class Router: def __init__(self): self._routes = {} """ Routes looks like: {path_section1: {path_section2: {method: (handler, params)}}} before startup and {path_section1: {papath_section2 {method: (wrapped, handler, params)}} after startup """ self._static_routes = {} """ Static routes are just simple d[url][method] lookups and skip middlewares """ self.options_handler = None self.handle_404 = _send_404 # the 404 route will skip middlewares self.handle_405 = _send_405 # the 405 handler will skip middleware self._prefix = '' # A list of tuples (method, url) self.urls = [] def _get_and_wrap_routes(self, _d=None): if _d is None: _d = self._routes for key, value in _d.items(): # if value is a dictionary, keep going # if value is a tuple, then wrap it! if isinstance(value, dict): yield from self._get_and_wrap_routes(_d=value) else: handler, params = value wrapped = yield handler _d[key] = (wrapped, handler, params) def get_handler_for_request(self, request): method = request.method path = request.path route = path.strip('/') if method == Methods.OPTIONS and self.options_handler is not None: # Is this an OPTION and do we have a generic options handler? request._handler = self.options_handler return self.options_handler try: return self._static_routes[route][method] except KeyError: # not in static routes pass d = self._routes params = [] raw_path_string = '/' is_a_path = False try: for portion in route.split('/'): sub = d.get(portion, None) if sub is None: # must be an ID field key = ID_KEY param = portion if ':' in portion: param, action = portion.split(':', 1) key += ':' + action sub = d[key] params.append(param) raw_path_string += '*/' else: raw_path_string += portion + '/' d = sub if any(isinstance(key, Methods) for key in d): is_a_path = True wrapped, handler, keys = d[method] except KeyError: if is_a_path: request._handler = self.handle_405 return self.handle_405 # No handler for given route request._handler = self.handle_404 return self.handle_404 assert len(keys) == len(params) for key, param in zip(keys, params): request.path_params[key] = param request._handler = handler request._raw_path = raw_path_string return wrapped def add_static_route(self, method: Union[str, Methods], route: str, handler: Callable, skip_middleware=False): """ Adds a static route. A static route is a special route that doesnt follow any of the normal rules, and never has any path parameters. Ideally, this is used for non-public facing endpoints such as "/healthcheck", or "/stats" or something of that nature. All static routes SKIP middlewares """ if isinstance(method, str): method = Methods(method.upper()) route = self._prefix + route route = route.strip('/') if route not in self._static_routes: self._static_routes[route] = {} self._static_routes[route][method] = handler def add_route(self, method: Union[str, Methods], route: str, handler: Callable): if isinstance(method, str): method = Methods(method.upper()) route = self._prefix + route route = route.strip('/') self.urls.append((method, route)) d = self._routes params = [] for portion in route.split('/'): if portion.startswith('{') and '}' in portion: sections = portion.lstrip('{').split('}', maxsplit=1) params.append(sections[0]) key = ID_KEY if sections[1]: if not sections[1].startswith(':'): raise NotAValidURLError( 'Cant have an id mixed with ' 'a static word without a colon') key += sections[1] else: key = portion if key not in d: d[key] = {} d = d[key] if method in d: raise ValueError(f"Duplicate route exists {method}") d[method] = handler, params def get(self, route: str, handler: Callable): self.add_route(Methods.GET, route, handler) def post(self, route: str, handler: Callable): self.add_route(Methods.POST, route, handler) def put(self, route: str, handler: Callable): self.add_route(Methods.PUT, route, handler) def patch(self, route: str, handler: Callable): self.add_route(Methods.PATCH, route, handler) def delete(self, route: str, handler: Callable): self.add_route(Methods.DELETE, route, handler) def head(self, route: str, handler: Callable): self.add_route(Methods.HEAD, route, handler) def options(self, route: str, handler: Callable): self.add_route(Methods.OPTIONS, route, handler) def add_get(self, route: str, handler: Callable): warnings.warn("add_get is deprecated, use get instead", DeprecationWarning) self.add_route(Methods.GET, route, handler) def add_post(self, route: str, handler: Callable): warnings.warn("add_post is deprecated, use post instead", DeprecationWarning) self.add_route(Methods.POST, route, handler) def add_put(self, route: str, handler: Callable): warnings.warn("add_put is deprecated, use put instead", DeprecationWarning) self.add_route(Methods.PUT, route, handler) def add_delete(self, route: str, handler: Callable): warnings.warn("add_delete is deprecated, use delete instead", DeprecationWarning) self.add_route(Methods.DELETE, route, handler) def add_patch(self, route: str, handler: Callable): warnings.warn("add_patch is deprecated, use patch instead", DeprecationWarning) self.add_route(Methods.PATCH, route, handler) def add_head(self, route: str, handler: Callable): warnings.warn("add_head is deprecated, use head instead", DeprecationWarning) self.add_route(Methods.HEAD, route, handler) def add_options(self, route: str, handler: Callable): warnings.warn("add_options is deprecated, use options instead", DeprecationWarning) self.add_route(Methods.OPTIONS, route, handler) def add_generic_options_handler(self, handler: Callable): """ Add a handler for all options requests. This WILL bypass middlewares """ self.options_handler = handler @contextmanager def prefix(self, prefix): """ Adds a prefix to routes contained within. """ original_prefix = self._prefix self._prefix += prefix yield self self._prefix = original_prefix PK!l!!waspy/router.pyimport warnings from contextlib import contextmanager from http import HTTPStatus from typing import Callable, Union from enum import Enum from .exceptions import ResponseError """ The below constant is special key in the router dictionary that determines an id section of the url. The / character is the only character that can't possibly be used in a url path or id, since it is the path delimiter. Therefore, there are '/' characters in the key, so that it can never accidentally be overridden. p.s. '_id' is to make the dictionary slightly more readable when debugging """ ID_KEY = '/_id/' class NotAValidURLError(Exception): """ When a url path syntax is not valid """ class Methods(Enum): GET = 'GET' POST = 'POST' PUT = 'PUT' DELETE = 'DELETE' OPTIONS = 'OPTIONS' PATCH = 'PATCH' HEAD = 'HEAD' PUBLISH = 'PUBLISH' # NOT VALID HTTP METHOD async def _send_404(request): raise ResponseError(status=HTTPStatus.NOT_FOUND) async def _send_405(request): raise ResponseError(status=HTTPStatus.METHOD_NOT_ALLOWED) class Router: def __init__(self): self._routes = {} """ Routes looks like: {path_section1: {path_section2: {method: (handler, params)}}} before startup and {path_section1: {papath_section2 {method: (wrapped, handler, params)}} after startup """ self._static_routes = {} """ Static routes are just simple d[url][method] lookups and skip middlewares """ self.options_handler = None self.handle_404 = _send_404 # the 404 route will skip middlewares self.handle_405 = _send_405 # the 405 handler will skip middleware self._prefix = '' # A list of tuples (method, url) self.urls = [] def _get_and_wrap_routes(self, _d=None): if _d is None: _d = self._routes for key, value in _d.items(): # if value is a dictionary, keep going # if value is a tuple, then wrap it! if isinstance(value, dict): yield from self._get_and_wrap_routes(_d=value) else: handler, params = value wrapped = yield handler _d[key] = (wrapped, handler, params) def get_handler_for_request(self, request): method = request.method path = request.path route = path.strip('/') if method == Methods.OPTIONS and self.options_handler is not None: # Is this an OPTION and do we have a generic options handler? request._handler = self.options_handler return self.options_handler try: return self._static_routes[route][method] except KeyError: # not in static routes pass d = self._routes params = [] raw_path_string = '/' is_a_path = False try: for portion in route.split('/'): sub = d.get(portion, None) if sub is None: # must be an ID field key = ID_KEY param = portion if ':' in portion: param, action = portion.split(':', 1) key += ':' + action sub = d[key] params.append(param) raw_path_string += '*/' else: raw_path_string += portion + '/' d = sub if any(isinstance(key, Methods) for key in d): is_a_path = True wrapped, handler, keys = d[method] except KeyError: if is_a_path: request._handler = self.handle_405 return self.handle_405 # No handler for given route request._handler = self.handle_404 return self.handle_404 assert len(keys) == len(params) for key, param in zip(keys, params): request.path_params[key] = param request._handler = handler request._raw_path = raw_path_string return wrapped def add_static_route(self, method: Union[str, Methods], route: str, handler: Callable, skip_middleware=False): """ Adds a static route. A static route is a special route that doesnt follow any of the normal rules, and never has any path parameters. Ideally, this is used for non-public facing endpoints such as "/healthcheck", or "/stats" or something of that nature. All static routes SKIP middlewares """ if isinstance(method, str): method = Methods(method.upper()) route = self._prefix + route route = route.strip('/') if route not in self._static_routes: self._static_routes[route] = {} self._static_routes[route][method] = handler def add_route(self, method: Union[str, Methods], route: str, handler: Callable): if isinstance(method, str): method = Methods(method.upper()) route = self._prefix + route route = route.strip('/') self.urls.append((method, route)) d = self._routes params = [] for portion in route.split('/'): if portion.startswith('{') and '}' in portion: sections = portion.lstrip('{').split('}', maxsplit=1) params.append(sections[0]) key = ID_KEY if sections[1]: if not sections[1].startswith(':'): raise NotAValidURLError( 'Cant have an id mixed with ' 'a static word without a colon') key += sections[1] else: key = portion if key not in d: d[key] = {} d = d[key] if method in d: raise ValueError(f"Duplicate route exists {method}") d[method] = handler, params def get(self, route: str, handler: Callable): self.add_route(Methods.GET, route, handler) def post(self, route: str, handler: Callable): self.add_route(Methods.POST, route, handler) def put(self, route: str, handler: Callable): self.add_route(Methods.PUT, route, handler) def patch(self, route: str, handler: Callable): self.add_route(Methods.PATCH, route, handler) def delete(self, route: str, handler: Callable): self.add_route(Methods.DELETE, route, handler) def head(self, route: str, handler: Callable): self.add_route(Methods.HEAD, route, handler) def options(self, route: str, handler: Callable): self.add_route(Methods.OPTIONS, route, handler) def add_get(self, route: str, handler: Callable): warnings.warn("add_get is deprecated, use get instead", DeprecationWarning) self.add_route(Methods.GET, route, handler) def add_post(self, route: str, handler: Callable): warnings.warn("add_post is deprecated, use post instead", DeprecationWarning) self.add_route(Methods.POST, route, handler) def add_put(self, route: str, handler: Callable): warnings.warn("add_put is deprecated, use put instead", DeprecationWarning) self.add_route(Methods.PUT, route, handler) def add_delete(self, route: str, handler: Callable): warnings.warn("add_delete is deprecated, use delete instead", DeprecationWarning) self.add_route(Methods.DELETE, route, handler) def add_patch(self, route: str, handler: Callable): warnings.warn("add_patch is deprecated, use patch instead", DeprecationWarning) self.add_route(Methods.PATCH, route, handler) def add_head(self, route: str, handler: Callable): warnings.warn("add_head is deprecated, use head instead", DeprecationWarning) self.add_route(Methods.HEAD, route, handler) def add_options(self, route: str, handler: Callable): warnings.warn("add_options is deprecated, use options instead", DeprecationWarning) self.add_route(Methods.OPTIONS, route, handler) def add_generic_options_handler(self, handler: Callable): """ Add a handler for all options requests. This WILL bypass middlewares """ self.options_handler = handler @contextmanager def prefix(self, prefix): """ Adds a prefix to routes contained within. """ original_prefix = self._prefix self._prefix += prefix yield self self._prefix = original_prefix PK!<_Qwaspy/transports/__init__.pyfrom .httptransport import HTTPTransport as HTTPToolsTransport from .httptransport import HTTPTransport, HTTPClientTransport from .rabbitmqtransport import RabbitMQTransport, RabbitMQClientTransport from .transportabc import TransportABC from .testtransport import TestTransport PK!<_Qwaspy/transports/__init__.pyfrom .httptransport import HTTPTransport as HTTPToolsTransport from .httptransport import HTTPTransport, HTTPClientTransport from .rabbitmqtransport import RabbitMQTransport, RabbitMQClientTransport from .transportabc import TransportABC from .testtransport import TestTransport PK!ׇ33!waspy/transports/httptransport.pyimport asyncio import traceback import logging import urllib.parse from http import HTTPStatus from httptools import HttpRequestParser, HttpResponseParser, HttpParserError, \ parse_url from ..webtypes import Request, Response from .transportabc import TransportABC, ClientTransportABC logger = logging.getLogger('waspy') class ClosedError(Exception): """ Error for closed connections """ class _HTTPClientConnection: slots = ('reader', 'writer', 'http_parser', '_done', '_data') def __init__(self): self.reader = None self.writer = None self.http_parser = HttpResponseParser(self) self.response = None self._data = b'' self._done = False async def connect(self, service, port, use_ssl): for _ in range(3): try: self.reader, self.writer = await \ asyncio.open_connection(service, port, ssl=use_ssl) return except ConnectionRefusedError: """ connection refused. Try again """ raise ConnectionRefusedError( f'Connection refused to "{service}" on port {port}') def send(self, method, path, headers, body): self.writer.write(f'{method.upper()} {path} HTTP/1.0\r\n' .encode('latin-1')) for header, value in headers: self.writer.write(f'{header}: {value}\r\n'.encode('latin-1')) self.writer.write(b'\r\n') if body: self.writer.write(body) async def get_response(self): while True: data = await self.reader.read(1064) self.http_parser.feed_data(data) if self._done: return self.response def close(self): self.writer.close() """ http parsing methods below """ def on_message_begin(self): self.response = Response() def on_header(self, name, value): name = name.decode('latin-1') value = value.decode() if name == 'X-Correlation-ID': self.response.correlation_id = value elif name.lower() == 'content-type': self.response.content_type = value else: self.response.headers[name] = value def on_headers_complete(self): self.response.status = HTTPStatus(self.http_parser.get_status_code()) def on_body(self, body): self._data += body def on_message_complete(self): self.response.body = self._data self._data = b'' self._done = True class HTTPClientTransport(ClientTransportABC): """Client implementation of the HTTP transport protocol""" def _get_connection_for_service(self, service): pass async def make_request(self, service, method, path, body=None, query=None, headers=None, correlation_id=None, content_type=None, port=80, **kwargs): # form request object if not path.startswith('/'): path = '/' + path path = path.replace(' ', '+') if headers is None: headers = {} use_ssl = service.startswith('https://') or port == 443 if service.startswith('http'): service = service.replace('http://', '').replace('https://', '') if use_ssl and port == 80: port = 443 if 'Host' not in headers and 'host' not in headers: headers['Host'] = service if port not in (80, 443): headers['Host'] += ':{}'.format(port) headers['Connection'] = 'close' headers.pop('connection', None) if correlation_id: headers['X-Correlation-Id'] = correlation_id if query: path += '?' + query if content_type: if body: # dont include content-type if there is no body headers['Content-Type'] = content_type headers.pop('content-type', None) headers.pop('content-length', None) headers.pop('Content-Length', None) if body: headers['Content-Length'] = str(len(body)) headers['User-Agent'] = headers.pop('user-agent', 'waspy-http-client') # now make a connection and send it connection = _HTTPClientConnection() await connection.connect(service, port, use_ssl) connection.send(method, path, headers.items(), body) try: result = await connection.get_response() finally: connection.close() return result class HTTPTransport(TransportABC): """ Server implementation of the HTTP transport protocol""" def get_client(self): return HTTPClientTransport() def __init__(self, port=8080, prefix=None, shutdown_grace_period=5, shutdown_wait_period=1): """ HTTP Transport for listening on http :param port: The port to lisen on (0.0.0.0 will always be used) :param prefix: the path prefix to remove from all url's :param shutdown_grace_period: Time to wait for server to shutdown before connections get forceably closed. The only way for connections to not be forcibly closed is to have some connection draining in front of the service for deploys. Most docker schedulers will do this for you. :param shutdown_wait_period: Time to wait after recieving the sigterm before starting shutdown """ self.port = port if prefix is None: prefix = '' self.prefix = prefix self._handler = None self._server = None self._loop = None self._done_future = asyncio.Future() self._connections = set() self.shutdown_grace_period = shutdown_grace_period self.shutdown_wait_period = shutdown_wait_period self.shutting_down = False self._config = {} def listen(self, *, loop: asyncio.AbstractEventLoop, config): self._loop = loop self._config = config if self._config['debug']: self.shutdown_grace_period = 0 self.shutdown_wait_period = 0 self._debug = True async def start(self, request_handler): self._handler = request_handler self._server = await self._loop.create_server( lambda: _HTTPServerProtocol(parent=self, loop=self._loop), host='0.0.0.0', port=self.port, reuse_address=True) print(f'-- Listening for HTTP on port {self.port} --') try: await self._done_future except asyncio.CancelledError: pass logger.warning("Shutting down HTTP transport") await asyncio.sleep(self.shutdown_wait_period) # wait for connections to stop times_no_connections = 0 for _ in range(self.shutdown_grace_period): if not self._connections: times_no_connections += 1 else: times_no_connections = 0 for con in self._connections: con.attempt_close() if times_no_connections > 3: # three seconds with no connections break await asyncio.sleep(1) # Shut the server down self._server.close() await self._server.wait_closed() async def handle_incoming_request(self, request): logger.debug('received incoming request via http: %s', request) response = await self._handler(request) return response def shutdown(self): self.shutting_down = True self._done_future.cancel() class _HTTPServerProtocol(asyncio.Protocol): """ HTTP Protocol handler. Should only be used by HTTPServerTransport """ __slots__ = ('_parent', '_transport', '_task', 'data', 'http_parser', 'request') def __init__(self, *, parent, loop): self._parent = parent self._transport = None self.data = None self.http_parser = HttpRequestParser(self) self.request = None self._loop = loop self._task: asyncio.Task = None """ The next 3 methods are for asyncio.Protocol handling """ def connection_made(self, transport): self._transport = transport self._parent._connections.add(self) def connection_lost(self, exc): self._parent._connections.discard(self) if self._task: self._task.cancel() self._transport = None def data_received(self, data): try: self.http_parser.feed_data(data) except HttpParserError as e: traceback.print_exc() logger.error('Bad http: %s', self.request) if self._transport: self.send_response( Response( status=400, body={ 'reason': 'Invalid HTTP', 'details': str(e) })) """ The following methods are for HTTP parsing (from httptools) """ def on_message_begin(self): self.request = Request() self.data = b'' def on_header(self, name, value): key = name.decode('latin-1').lower() if not value: value = b'' val = value.decode() self.request.headers[key] = val if key == 'x-correlation-id': self.request.correlation_id = val if key == 'content-type': self.request.content_type = val def on_headers_complete(self): self.request.method = self.http_parser.get_method().decode('latin-1') def on_body(self, body: bytes): self.data += body def on_message_complete(self): self.request.body = self.data task = self._loop.create_task( self._parent.handle_incoming_request(self.request)) task.add_done_callback(self.handle_response) self._task = task def on_url(self, url): url = url.replace(b'//', b'/') url = parse_url(url) if url.query: # query = urllib.parse.unquote(url.query.decode('latin-1')) self.request.query_string = url.query.decode('latin-1') path = urllib.parse.unquote(url.path.decode('latin-1')) if path.startswith(self._parent.prefix): path = path[len(self._parent.prefix):] self.request.path = path """ End parsing methods """ def handle_response(self, future): try: self.send_response(future.result()) except Exception: traceback.print_exc() self.send_response( Response( status=500, body={'reason': 'Something really bad happened'}, content_type=self.request.app.default_content_type )) def send_response(self, response): if response is None: # connection closed, no response return headers = 'HTTP/1.1 {status_code} {status_message}\r\n'.format( status_code=response.status.value, status_message=response.status.phrase, ) headers += 'Connection: close\r\n' # if self._parent.shutting_down: # headers += 'Connection: close\r\n' # else: # headers += 'Connection: keep-alive\r\n' # headers += 'Keep-Alive: timeout=5, max=50\r\n' if response.raw_body: headers += 'Content-Type: {}\r\n'.format(response.content_type) headers += 'Content-Length: {}\r\n'.format(len(response.raw_body)) if ('transfer-encoding' in response.headers or 'Transfer-Encoding' in response.headers): print('Httptoolstransport currently doesnt support ' 'chunked mode, attempting without.') response.headers.pop('transfer-encoding', None) response.headers.pop('Transfer-Encoding', None) else: headers += 'Content-Length: {}\r\n'.format(0) for header, value in response.headers.items(): if header in ('Content-Length', 'content-lenth'): continue headers += '{header}: {value}\r\n'.format( header=header, value=value) result = headers.encode('latin-1') + b'\r\n' if response.raw_body: result += response.raw_body try: self._transport.write(result) except AttributeError: # "NoneType has no attribute 'write'" because transport is closed logger.debug( 'Connection closed prematurely, most likely by client') self.request = 0 self.data = 0 self.attempt_close() def attempt_close(self): if self.request == 0 and self._transport: self._transport.close() PK!ׇ33!waspy/transports/httptransport.pyimport asyncio import traceback import logging import urllib.parse from http import HTTPStatus from httptools import HttpRequestParser, HttpResponseParser, HttpParserError, \ parse_url from ..webtypes import Request, Response from .transportabc import TransportABC, ClientTransportABC logger = logging.getLogger('waspy') class ClosedError(Exception): """ Error for closed connections """ class _HTTPClientConnection: slots = ('reader', 'writer', 'http_parser', '_done', '_data') def __init__(self): self.reader = None self.writer = None self.http_parser = HttpResponseParser(self) self.response = None self._data = b'' self._done = False async def connect(self, service, port, use_ssl): for _ in range(3): try: self.reader, self.writer = await \ asyncio.open_connection(service, port, ssl=use_ssl) return except ConnectionRefusedError: """ connection refused. Try again """ raise ConnectionRefusedError( f'Connection refused to "{service}" on port {port}') def send(self, method, path, headers, body): self.writer.write(f'{method.upper()} {path} HTTP/1.0\r\n' .encode('latin-1')) for header, value in headers: self.writer.write(f'{header}: {value}\r\n'.encode('latin-1')) self.writer.write(b'\r\n') if body: self.writer.write(body) async def get_response(self): while True: data = await self.reader.read(1064) self.http_parser.feed_data(data) if self._done: return self.response def close(self): self.writer.close() """ http parsing methods below """ def on_message_begin(self): self.response = Response() def on_header(self, name, value): name = name.decode('latin-1') value = value.decode() if name == 'X-Correlation-ID': self.response.correlation_id = value elif name.lower() == 'content-type': self.response.content_type = value else: self.response.headers[name] = value def on_headers_complete(self): self.response.status = HTTPStatus(self.http_parser.get_status_code()) def on_body(self, body): self._data += body def on_message_complete(self): self.response.body = self._data self._data = b'' self._done = True class HTTPClientTransport(ClientTransportABC): """Client implementation of the HTTP transport protocol""" def _get_connection_for_service(self, service): pass async def make_request(self, service, method, path, body=None, query=None, headers=None, correlation_id=None, content_type=None, port=80, **kwargs): # form request object if not path.startswith('/'): path = '/' + path path = path.replace(' ', '+') if headers is None: headers = {} use_ssl = service.startswith('https://') or port == 443 if service.startswith('http'): service = service.replace('http://', '').replace('https://', '') if use_ssl and port == 80: port = 443 if 'Host' not in headers and 'host' not in headers: headers['Host'] = service if port not in (80, 443): headers['Host'] += ':{}'.format(port) headers['Connection'] = 'close' headers.pop('connection', None) if correlation_id: headers['X-Correlation-Id'] = correlation_id if query: path += '?' + query if content_type: if body: # dont include content-type if there is no body headers['Content-Type'] = content_type headers.pop('content-type', None) headers.pop('content-length', None) headers.pop('Content-Length', None) if body: headers['Content-Length'] = str(len(body)) headers['User-Agent'] = headers.pop('user-agent', 'waspy-http-client') # now make a connection and send it connection = _HTTPClientConnection() await connection.connect(service, port, use_ssl) connection.send(method, path, headers.items(), body) try: result = await connection.get_response() finally: connection.close() return result class HTTPTransport(TransportABC): """ Server implementation of the HTTP transport protocol""" def get_client(self): return HTTPClientTransport() def __init__(self, port=8080, prefix=None, shutdown_grace_period=5, shutdown_wait_period=1): """ HTTP Transport for listening on http :param port: The port to lisen on (0.0.0.0 will always be used) :param prefix: the path prefix to remove from all url's :param shutdown_grace_period: Time to wait for server to shutdown before connections get forceably closed. The only way for connections to not be forcibly closed is to have some connection draining in front of the service for deploys. Most docker schedulers will do this for you. :param shutdown_wait_period: Time to wait after recieving the sigterm before starting shutdown """ self.port = port if prefix is None: prefix = '' self.prefix = prefix self._handler = None self._server = None self._loop = None self._done_future = asyncio.Future() self._connections = set() self.shutdown_grace_period = shutdown_grace_period self.shutdown_wait_period = shutdown_wait_period self.shutting_down = False self._config = {} def listen(self, *, loop: asyncio.AbstractEventLoop, config): self._loop = loop self._config = config if self._config['debug']: self.shutdown_grace_period = 0 self.shutdown_wait_period = 0 self._debug = True async def start(self, request_handler): self._handler = request_handler self._server = await self._loop.create_server( lambda: _HTTPServerProtocol(parent=self, loop=self._loop), host='0.0.0.0', port=self.port, reuse_address=True) print(f'-- Listening for HTTP on port {self.port} --') try: await self._done_future except asyncio.CancelledError: pass logger.warning("Shutting down HTTP transport") await asyncio.sleep(self.shutdown_wait_period) # wait for connections to stop times_no_connections = 0 for _ in range(self.shutdown_grace_period): if not self._connections: times_no_connections += 1 else: times_no_connections = 0 for con in self._connections: con.attempt_close() if times_no_connections > 3: # three seconds with no connections break await asyncio.sleep(1) # Shut the server down self._server.close() await self._server.wait_closed() async def handle_incoming_request(self, request): logger.debug('received incoming request via http: %s', request) response = await self._handler(request) return response def shutdown(self): self.shutting_down = True self._done_future.cancel() class _HTTPServerProtocol(asyncio.Protocol): """ HTTP Protocol handler. Should only be used by HTTPServerTransport """ __slots__ = ('_parent', '_transport', '_task', 'data', 'http_parser', 'request') def __init__(self, *, parent, loop): self._parent = parent self._transport = None self.data = None self.http_parser = HttpRequestParser(self) self.request = None self._loop = loop self._task: asyncio.Task = None """ The next 3 methods are for asyncio.Protocol handling """ def connection_made(self, transport): self._transport = transport self._parent._connections.add(self) def connection_lost(self, exc): self._parent._connections.discard(self) if self._task: self._task.cancel() self._transport = None def data_received(self, data): try: self.http_parser.feed_data(data) except HttpParserError as e: traceback.print_exc() logger.error('Bad http: %s', self.request) if self._transport: self.send_response( Response( status=400, body={ 'reason': 'Invalid HTTP', 'details': str(e) })) """ The following methods are for HTTP parsing (from httptools) """ def on_message_begin(self): self.request = Request() self.data = b'' def on_header(self, name, value): key = name.decode('latin-1').lower() if not value: value = b'' val = value.decode() self.request.headers[key] = val if key == 'x-correlation-id': self.request.correlation_id = val if key == 'content-type': self.request.content_type = val def on_headers_complete(self): self.request.method = self.http_parser.get_method().decode('latin-1') def on_body(self, body: bytes): self.data += body def on_message_complete(self): self.request.body = self.data task = self._loop.create_task( self._parent.handle_incoming_request(self.request)) task.add_done_callback(self.handle_response) self._task = task def on_url(self, url): url = url.replace(b'//', b'/') url = parse_url(url) if url.query: # query = urllib.parse.unquote(url.query.decode('latin-1')) self.request.query_string = url.query.decode('latin-1') path = urllib.parse.unquote(url.path.decode('latin-1')) if path.startswith(self._parent.prefix): path = path[len(self._parent.prefix):] self.request.path = path """ End parsing methods """ def handle_response(self, future): try: self.send_response(future.result()) except Exception: traceback.print_exc() self.send_response( Response( status=500, body={'reason': 'Something really bad happened'}, content_type=self.request.app.default_content_type )) def send_response(self, response): if response is None: # connection closed, no response return headers = 'HTTP/1.1 {status_code} {status_message}\r\n'.format( status_code=response.status.value, status_message=response.status.phrase, ) headers += 'Connection: close\r\n' # if self._parent.shutting_down: # headers += 'Connection: close\r\n' # else: # headers += 'Connection: keep-alive\r\n' # headers += 'Keep-Alive: timeout=5, max=50\r\n' if response.raw_body: headers += 'Content-Type: {}\r\n'.format(response.content_type) headers += 'Content-Length: {}\r\n'.format(len(response.raw_body)) if ('transfer-encoding' in response.headers or 'Transfer-Encoding' in response.headers): print('Httptoolstransport currently doesnt support ' 'chunked mode, attempting without.') response.headers.pop('transfer-encoding', None) response.headers.pop('Transfer-Encoding', None) else: headers += 'Content-Length: {}\r\n'.format(0) for header, value in response.headers.items(): if header in ('Content-Length', 'content-lenth'): continue headers += '{header}: {value}\r\n'.format( header=header, value=value) result = headers.encode('latin-1') + b'\r\n' if response.raw_body: result += response.raw_body try: self._transport.write(result) except AttributeError: # "NoneType has no attribute 'write'" because transport is closed logger.debug( 'Connection closed prematurely, most likely by client') self.request = 0 self.data = 0 self.attempt_close() def attempt_close(self): if self.request == 0 and self._transport: self._transport.close() PK!s"waspy/transports/rabbit_patches.py""" This is ugly, but you do what you gotta do So... on that note: Lets do some monkey patching!! We need to support mandatory bit, and handle returned messages, but aioamqp doesnt support it yet. (follow https://github.com/Polyconseil/aioamqp/pull/158) monkey patching _write_frame_awaiting_response fixes a waiter error. (follow PR here: https://github.com/Polyconseil/aioamqp/pull/159) """ import io import logging import aioamqp from aioamqp import frame as amqp_frame, channel from aioamqp import constants as amqp_constants logger = logging.getLogger(__name__) class ReturnEnvelope: __slots__ = ('reply_code', 'reply_text', 'exchange_name', 'routing_key') def __init__(self, reply_code, reply_text, exchange_name, routing_key): self.reply_code = reply_code self.reply_text = reply_text self.exchange_name = exchange_name self.routing_key = routing_key async def basic_return(self, frame): response = amqp_frame.AmqpDecoder(frame.payload) reply_code = response.read_short() reply_text = response.read_shortstr() exchange_name = response.read_shortstr() routing_key = response.read_shortstr() content_header_frame = await self.protocol.get_frame() buffer = io.BytesIO() while buffer.tell() < content_header_frame.body_size: content_body_frame = await self.protocol.get_frame() buffer.write(content_body_frame.payload) body = buffer.getvalue() envelope = ReturnEnvelope(reply_code, reply_text, exchange_name, routing_key) properties = content_header_frame.properties callback = self.return_callback if self.return_callback is None: # they have set mandatory bit, but havent added a callback logger.warning( 'You have received a returned message, but dont have a callback registered for returns.' ' Please set channel.return_callback') else: await callback(self, body, envelope, properties) async def dispatch_frame(self, frame): methods = { (amqp_constants.CLASS_CHANNEL, amqp_constants.CHANNEL_OPEN_OK): self.open_ok, (amqp_constants.CLASS_CHANNEL, amqp_constants.CHANNEL_FLOW_OK): self.flow_ok, (amqp_constants.CLASS_CHANNEL, amqp_constants.CHANNEL_CLOSE_OK): self.close_ok, (amqp_constants.CLASS_CHANNEL, amqp_constants.CHANNEL_CLOSE): self.server_channel_close, (amqp_constants.CLASS_EXCHANGE, amqp_constants.EXCHANGE_DECLARE_OK): self.exchange_declare_ok, (amqp_constants.CLASS_EXCHANGE, amqp_constants.EXCHANGE_BIND_OK): self.exchange_bind_ok, (amqp_constants.CLASS_EXCHANGE, amqp_constants.EXCHANGE_UNBIND_OK): self.exchange_unbind_ok, (amqp_constants.CLASS_EXCHANGE, amqp_constants.EXCHANGE_DELETE_OK): self.exchange_delete_ok, (amqp_constants.CLASS_QUEUE, amqp_constants.QUEUE_DECLARE_OK): self.queue_declare_ok, (amqp_constants.CLASS_QUEUE, amqp_constants.QUEUE_DELETE_OK): self.queue_delete_ok, (amqp_constants.CLASS_QUEUE, amqp_constants.QUEUE_BIND_OK): self.queue_bind_ok, (amqp_constants.CLASS_QUEUE, amqp_constants.QUEUE_UNBIND_OK): self.queue_unbind_ok, (amqp_constants.CLASS_QUEUE, amqp_constants.QUEUE_PURGE_OK): self.queue_purge_ok, (amqp_constants.CLASS_BASIC, amqp_constants.BASIC_QOS_OK): self.basic_qos_ok, (amqp_constants.CLASS_BASIC, amqp_constants.BASIC_CONSUME_OK): self.basic_consume_ok, (amqp_constants.CLASS_BASIC, amqp_constants.BASIC_CANCEL_OK): self.basic_cancel_ok, (amqp_constants.CLASS_BASIC, amqp_constants.BASIC_GET_OK): self.basic_get_ok, (amqp_constants.CLASS_BASIC, amqp_constants.BASIC_GET_EMPTY): self.basic_get_empty, (amqp_constants.CLASS_BASIC, amqp_constants.BASIC_DELIVER): self.basic_deliver, (amqp_constants.CLASS_BASIC, amqp_constants.BASIC_CANCEL): self.server_basic_cancel, (amqp_constants.CLASS_BASIC, amqp_constants.BASIC_ACK): self.basic_server_ack, (amqp_constants.CLASS_BASIC, amqp_constants.BASIC_NACK): self.basic_server_nack, (amqp_constants.CLASS_BASIC, amqp_constants.BASIC_RECOVER_OK): self.basic_recover_ok, (amqp_constants.CLASS_BASIC, amqp_constants.BASIC_RETURN): self.basic_return, (amqp_constants.CLASS_CONFIRM, amqp_constants.CONFIRM_SELECT_OK): self.confirm_select_ok, } if (frame.class_id, frame.method_id) not in methods: raise NotImplementedError("Frame (%s, %s) is not implemented" % (frame.class_id, frame.method_id)) await methods[(frame.class_id, frame.method_id)](frame) async def _write_frame_awaiting_response(self, waiter_id, frame, request, no_wait, check_open=True, drain=True): '''Write a frame and set a waiter for the response (unless no_wait is set)''' if no_wait: await self._write_frame(frame, request, check_open=check_open, drain=drain) return None f = self._set_waiter(waiter_id) try: await self._write_frame(frame, request, check_open=check_open, drain=drain) except Exception: self._get_waiter(waiter_id) f.cancel() raise result = await f try: self._get_waiter(waiter_id) except aioamqp.SynchronizationError: # no waiter to get pass return result channel.Channel._write_frame_awaiting_response = _write_frame_awaiting_response channel.Channel.dispatch_frame = dispatch_frame channel.Channel.basic_return = basic_returnPK!s"waspy/transports/rabbit_patches.py""" This is ugly, but you do what you gotta do So... on that note: Lets do some monkey patching!! We need to support mandatory bit, and handle returned messages, but aioamqp doesnt support it yet. (follow https://github.com/Polyconseil/aioamqp/pull/158) monkey patching _write_frame_awaiting_response fixes a waiter error. (follow PR here: https://github.com/Polyconseil/aioamqp/pull/159) """ import io import logging import aioamqp from aioamqp import frame as amqp_frame, channel from aioamqp import constants as amqp_constants logger = logging.getLogger(__name__) class ReturnEnvelope: __slots__ = ('reply_code', 'reply_text', 'exchange_name', 'routing_key') def __init__(self, reply_code, reply_text, exchange_name, routing_key): self.reply_code = reply_code self.reply_text = reply_text self.exchange_name = exchange_name self.routing_key = routing_key async def basic_return(self, frame): response = amqp_frame.AmqpDecoder(frame.payload) reply_code = response.read_short() reply_text = response.read_shortstr() exchange_name = response.read_shortstr() routing_key = response.read_shortstr() content_header_frame = await self.protocol.get_frame() buffer = io.BytesIO() while buffer.tell() < content_header_frame.body_size: content_body_frame = await self.protocol.get_frame() buffer.write(content_body_frame.payload) body = buffer.getvalue() envelope = ReturnEnvelope(reply_code, reply_text, exchange_name, routing_key) properties = content_header_frame.properties callback = self.return_callback if self.return_callback is None: # they have set mandatory bit, but havent added a callback logger.warning( 'You have received a returned message, but dont have a callback registered for returns.' ' Please set channel.return_callback') else: await callback(self, body, envelope, properties) async def dispatch_frame(self, frame): methods = { (amqp_constants.CLASS_CHANNEL, amqp_constants.CHANNEL_OPEN_OK): self.open_ok, (amqp_constants.CLASS_CHANNEL, amqp_constants.CHANNEL_FLOW_OK): self.flow_ok, (amqp_constants.CLASS_CHANNEL, amqp_constants.CHANNEL_CLOSE_OK): self.close_ok, (amqp_constants.CLASS_CHANNEL, amqp_constants.CHANNEL_CLOSE): self.server_channel_close, (amqp_constants.CLASS_EXCHANGE, amqp_constants.EXCHANGE_DECLARE_OK): self.exchange_declare_ok, (amqp_constants.CLASS_EXCHANGE, amqp_constants.EXCHANGE_BIND_OK): self.exchange_bind_ok, (amqp_constants.CLASS_EXCHANGE, amqp_constants.EXCHANGE_UNBIND_OK): self.exchange_unbind_ok, (amqp_constants.CLASS_EXCHANGE, amqp_constants.EXCHANGE_DELETE_OK): self.exchange_delete_ok, (amqp_constants.CLASS_QUEUE, amqp_constants.QUEUE_DECLARE_OK): self.queue_declare_ok, (amqp_constants.CLASS_QUEUE, amqp_constants.QUEUE_DELETE_OK): self.queue_delete_ok, (amqp_constants.CLASS_QUEUE, amqp_constants.QUEUE_BIND_OK): self.queue_bind_ok, (amqp_constants.CLASS_QUEUE, amqp_constants.QUEUE_UNBIND_OK): self.queue_unbind_ok, (amqp_constants.CLASS_QUEUE, amqp_constants.QUEUE_PURGE_OK): self.queue_purge_ok, (amqp_constants.CLASS_BASIC, amqp_constants.BASIC_QOS_OK): self.basic_qos_ok, (amqp_constants.CLASS_BASIC, amqp_constants.BASIC_CONSUME_OK): self.basic_consume_ok, (amqp_constants.CLASS_BASIC, amqp_constants.BASIC_CANCEL_OK): self.basic_cancel_ok, (amqp_constants.CLASS_BASIC, amqp_constants.BASIC_GET_OK): self.basic_get_ok, (amqp_constants.CLASS_BASIC, amqp_constants.BASIC_GET_EMPTY): self.basic_get_empty, (amqp_constants.CLASS_BASIC, amqp_constants.BASIC_DELIVER): self.basic_deliver, (amqp_constants.CLASS_BASIC, amqp_constants.BASIC_CANCEL): self.server_basic_cancel, (amqp_constants.CLASS_BASIC, amqp_constants.BASIC_ACK): self.basic_server_ack, (amqp_constants.CLASS_BASIC, amqp_constants.BASIC_NACK): self.basic_server_nack, (amqp_constants.CLASS_BASIC, amqp_constants.BASIC_RECOVER_OK): self.basic_recover_ok, (amqp_constants.CLASS_BASIC, amqp_constants.BASIC_RETURN): self.basic_return, (amqp_constants.CLASS_CONFIRM, amqp_constants.CONFIRM_SELECT_OK): self.confirm_select_ok, } if (frame.class_id, frame.method_id) not in methods: raise NotImplementedError("Frame (%s, %s) is not implemented" % (frame.class_id, frame.method_id)) await methods[(frame.class_id, frame.method_id)](frame) async def _write_frame_awaiting_response(self, waiter_id, frame, request, no_wait, check_open=True, drain=True): '''Write a frame and set a waiter for the response (unless no_wait is set)''' if no_wait: await self._write_frame(frame, request, check_open=check_open, drain=drain) return None f = self._set_waiter(waiter_id) try: await self._write_frame(frame, request, check_open=check_open, drain=drain) except Exception: self._get_waiter(waiter_id) f.cancel() raise result = await f try: self._get_waiter(waiter_id) except aioamqp.SynchronizationError: # no waiter to get pass return result channel.Channel._write_frame_awaiting_response = _write_frame_awaiting_response channel.Channel.dispatch_frame = dispatch_frame channel.Channel.basic_return = basic_returnPK!Z(G@GG%waspy/transports/rabbitmqtransport.pyfrom . import rabbit_patches import asyncio import logging import urllib.parse import uuid import os import aioamqp import re from aioamqp import protocol from aioamqp.channel import Channel from .transportabc import TransportABC, ClientTransportABC, WorkerTransportABC from ..webtypes import Request, Response, Methods from ..exceptions import NotRoutableError from waspy.listeners.transport_listener_abc import TransportListenerABC logger = logging.getLogger("waspy") class NackMePleaseError(Exception): """ This is a dirty dirty dirty dirty hack that is in place until I have time to add a real worker tier type connector support in waspy TODO: Please get rid of this as soon as your able to """ def parse_rabbit_message(body, envelope, properties): return Response() class RabbitChannelMixIn: def __init__(self): self.channel = None self._channel_ready = asyncio.Event() self.channels = {} async def _bootstrap_channel(self, channel): raise NotImplementedError async def _handle_rabbit_error(self, exception): if type(exception) == aioamqp.ChannelClosed: if self._protocol and self._protocol.state not in (protocol.CLOSING, protocol.CLOSED): logger.warning("RabbitMQ channel closed... Creating new channel") self._channel_ready.clear() self.channel = None channel = await self._protocol.channel() await self._bootstrap_channel(channel) self._channel_ready.set() elif type(exception) == aioamqp.AmqpClosedConnection: logger.error("RabbitMQ connection closed") else: logger.error(f"Unknown exception occurred: {exception}") raise exception async def create_channel(self) -> aioamqp.channel.Channel: channel = await self._protocol.channel() self.channels[channel.channel_id] = channel return channel async def close_channel(self, channel: aioamqp.channel.Channel) -> None: del self.channels[channel.channel_id] await channel.close() async def disconnect(self): for channel in self.channels.values(): if channel.is_open: await channel.close() self.channels = {} if self._protocol and self._protocol.state != protocol.CLOSED: if self._protocol.state == protocol.CLOSING: await self._protocol.wait_closed() else: await self._protocol.close() if self._transport: self._transport.close() async def connect(self, loop=None): async def do_connect(): if self.channel and self.channel.is_open: return logger.warning('Establishing new connection') if self._protocol: await self.disconnect() if os.getenv('DEBUG', 'false') == 'true': # todo: should make this use config and logging, and not env vars print(dict(host=self.host, port=self.port, virtualhost=self.virtualhost, login=self.username, password='*******', ssl=self.ssl, verify_ssl=self.verify_ssl, heartbeat=self.heartbeat, on_error=self._handle_rabbit_error, loop=loop)) self._transport, self._protocol = await aioamqp.connect( host=self.host, port=self.port, virtualhost=self.virtualhost, login=self.username, password=self.password, ssl=self.ssl, verify_ssl=self.verify_ssl, heartbeat=self.heartbeat, on_error=self._handle_rabbit_error, loop=loop ) channel = await self._protocol.channel() await self._bootstrap_channel(channel) self._channel_ready.set() async def reconnect(): try: while not self._closing: await do_connect() if hasattr(self, 'listeners'): for listener in self.listeners: channel = await self.create_channel() await listener.set_channel(channel) await listener.start() await self._protocol.wait_closed() self._channel_ready.clear() self.channel = None await self.disconnect() finally: await self.disconnect() asyncio.ensure_future(reconnect()) class RabbitMQClientTransport(ClientTransportABC, RabbitChannelMixIn): _CLOSING_SENTINAL = object() def __init__(self, *, url=None, port=5672, virtualhost='/', username='guest', password='guest', ssl=False, verify_ssl=True, heartbeat=20): super().__init__() self._transport = None self._protocol = None self._response_futures = {} self.host = url self.port = port self.virtualhost = virtualhost self.username = username self.password = password self.ssl = ssl self.verify_ssl = verify_ssl self.response_queue_name = str(uuid.uuid1()).encode() self._consumer_tag = None self._closing = False self.channel = None self.heartbeat = heartbeat self._connected = False if not url: raise TypeError("RabbitMqClientTransport() missing 1 required keyword-only argument: 'url'") async def make_request(self, service: str, method: str, path: str, body: bytes = None, query: str = None, headers: dict = None, correlation_id: str = None, content_type: str = None, exchange: str = 'amq.topic', timeout: int = 30, mandatory: bool = False, **kwargs): if not self._connected: self._connected = True asyncio.ensure_future(self.connect()) await self._channel_ready.wait() if correlation_id is None: correlation_id = str(uuid.uuid4()) # need to use `?` to represent `.` in rabbit # since its not valid in a path, it should work correctly everywhere path = path.replace('.', '?') # now turn slashes into dots for rabbit style paths path = path.replace('/', '.').lstrip('.') if method != 'PUBLISH': path = f'{method.lower()}.' + path if headers is None: headers = {} if query: headers['x-wasp-query-string'] = query if not body: body = b'null' message_id = str(uuid.uuid4()) properties = { 'headers': headers, 'correlation_id': correlation_id, 'message_id': message_id, 'type': method, 'app_id': 'test', } if method != 'PUBLISH': properties['reply_to'] = self.response_queue_name properties['expiration']: str(timeout * 1000) if content_type: properties['content_type'] = content_type for i in range(3): # retry messages on closed channels print(i) try: await self.channel.basic_publish(exchange_name=exchange, routing_key=path, properties=properties, payload=body, mandatory=mandatory) except aioamqp.ChannelClosed as e: """ Usually this means that rabbitmq closed the connection, because something was bad, such as the exchange name, or something """ await self._handle_rabbit_error(e) if i == 2: raise else: break if method != 'PUBLISH': future = asyncio.Future() self._response_futures[message_id] = future return await future async def _bootstrap_channel(self, channel: Channel): if self.channel == channel: logger.warning("somehow the channels are the same on a bootstrap") if self.channel and self.channel.is_open: await self.channel.close() self.channel = channel await self.channel.queue_declare(queue_name=self.response_queue_name, durable=False, exclusive=False, auto_delete=True) self.channel.return_callback = self.handle_return try: self._consumer_tag = (await self.channel.basic_consume( self.handle_responses, queue_name=self.response_queue_name, no_ack=True)).get('consumer_tag') except aioamqp.SynchronizationError as e: logger.exception('Channel already consuming') raise async def handle_responses(self, channel, body, envelope, properties): future = self._response_futures.pop(properties.message_id) headers = properties.headers status = headers.pop('Status') response = Response(headers=headers, correlation_id=properties.correlation_id, body=body, status=int(status), content_type=properties.content_type) if not future.done(): future.set_result(response) async def handle_return(self, channel, body, envelope, properties): future = self._response_futures.pop(properties.message_id, None) if not future: logger.warning('Got a returned message with nowhere to send it') return if envelope.reply_code == 312: # no route future.set_exception(NotRoutableError()) else: logger.error(f'Got a return with an unknown reply code: ' f'{envelope.reply_code}') async def close(self): self._closing = True await self.disconnect() class RabbitMQTransport(TransportABC, RabbitChannelMixIn): def __init__(self, *, url, port=5672, queue='', virtualhost='/', username='guest', password='guest', ssl=False, verify_ssl=True, create_queue=True, use_acks=False, heartbeat=20): super().__init__() self.host = url self.port = port self.virtualhost = virtualhost self.queue = queue self.username = username self.password = password self.ssl = ssl self.verify_ssl = verify_ssl self.create_queue = create_queue self._use_acks = use_acks self._transport = None self._protocol = None self.channel = None self._app = None self._loop = None self._consumer_tag = None self._counter = 0 self._handler = None self._done_future = asyncio.Future() self._closing = False self._client = None self.heartbeat = heartbeat self._config = {} self.listeners = [] def get_client(self): if not self._client: # TODO: not ideal, the client/server should ideally share a channel # or at least a connection self._client = RabbitMQClientTransport( url=self.host, port=self.port, virtualhost=self.virtualhost, username=self.username, password=self.password, ssl=self.ssl, verify_ssl=self.verify_ssl) return self._client async def declare_exchange(self): pass async def declare_queue(self): pass async def bind_to_exchange(self, *, exchange, routing_key): await self.channel.queue_bind(exchange_name=exchange, queue_name=self.queue, routing_key=routing_key) async def register_router(self, router, exchange='amq.topic'): if not self.channel: # Something weird is going on here? return for topic in (parse_url_to_topic(*url) for url in router.urls): await self.bind_to_exchange(exchange=exchange, routing_key=topic) async def start(self, handler): print(f"-- Listening for rabbitmq messages on queue {self.queue} --") self._handler = handler await self._channel_ready.wait() # channel hasn't actually been bootstraped yet await self._bootstrap_channel(self.channel) try: await self._done_future except asyncio.CancelledError: pass # shutting down logger.warning("Shutting down rabbitmq transport") await self.channel.basic_cancel(self._consumer_tag) await self.close() while self._counter > 0: await asyncio.sleep(1) def listen(self, *, loop, config): loop.create_task(self.connect(loop=loop)) self._config = config async def setup(): await self._channel_ready.wait() if self.create_queue: await self.channel.queue_declare(queue_name=self.queue) loop.run_until_complete(setup()) async def close(self): self._closing = True if self._client: await self._client.close() await self.disconnect() async def handle_request(self, channel: Channel, body, envelope, properties, futurize=True): """ the 'futurize' param is simply because aioamqp doesnt send another job until this method returns (completes), so we ensure the future of ourselves and return immediately so we can handle many requests at a time. """ if futurize: asyncio.ensure_future( self.handle_request(channel, body, envelope, properties, futurize=False)) return self._counter += 1 headers = properties.headers or {} query = headers.pop('x-wasp-query-string', '').lstrip('?') correlation_id = properties.correlation_id message_id = properties.message_id reply_to = properties.reply_to route = envelope.routing_key method, path = route.split('.', 1) try: method = Methods(method.upper()) except ValueError: path = route method = 'POST' path = path.replace('.', '/') # need to use `?` to represent `.` in rabbit # since its not valid in a path, it should work correctly everywhere path = path.replace('?', '.') path = urllib.parse.unquote(path) request = Request( headers=headers, path=path, correlation_id=correlation_id, method=method, query_string=query, body=body, ) if properties.content_type: headers['content-type'] = properties.content_type request.content_type = properties.content_type if properties.content_encoding: headers['content-encoding'] = properties.content_encoding logger.debug('received incoming request via rabbitmq: %s', request) response = await self._handler(request) if response is None: # task got cancelled. Dont send a response. return if reply_to: response.headers['Status'] = str(response.status.value) payload = response.raw_body or b'null' properties = { 'correlation_id': response.correlation_id, 'headers': response.headers, 'content_type': response.content_type, 'message_id': message_id, 'expiration': '30000', } await self._channel_ready.wait() await channel.basic_publish(exchange_name='', payload=payload, routing_key=reply_to, properties=properties) if self._use_acks: await self.channel.basic_client_ack(delivery_tag=envelope.delivery_tag) self._counter -= 1 def shutdown(self): self._done_future.cancel() async def _bootstrap_channel(self, channel): self.channel = channel if self._handler is None: return await self.channel.basic_qos(prefetch_count=1) resp = await self.channel.basic_consume( self.handle_request, queue_name=self.queue, no_ack=not self._use_acks ) self._consumer_tag = resp.get('consumer_tag') def add_listener(self, listener: TransportListenerABC): self.listeners.append(listener) listener.set_transport(self) def parse_url_to_topic(method, route): """ Transforms a URL to a topic. `GET /bar/{id}` -> `get.bar.*` `POST /bar/{id}` -> `post.bar.*` `GET /foo/bar/{id}/baz` -? `get.foo.bar.*.baz` Possible gotchas `GET /foo/{id}` -> `get.foo.*` `GET /foo/{id}:action` -> `get.foo.*` However, once it hits the service the router will be able to distinguish the two requests. """ route = route.replace('.', '?') route = route.replace('/', '.').strip('.') topic = f'{method.value.lower()}.{route}' # need to replace `{id}` and `{id}:some_method` with just `*` return re.sub(r"\.\{[^\}]*\}[:\w\d_-]*", ".*", topic) PK!Z(G@GG%waspy/transports/rabbitmqtransport.pyfrom . import rabbit_patches import asyncio import logging import urllib.parse import uuid import os import aioamqp import re from aioamqp import protocol from aioamqp.channel import Channel from .transportabc import TransportABC, ClientTransportABC, WorkerTransportABC from ..webtypes import Request, Response, Methods from ..exceptions import NotRoutableError from waspy.listeners.transport_listener_abc import TransportListenerABC logger = logging.getLogger("waspy") class NackMePleaseError(Exception): """ This is a dirty dirty dirty dirty hack that is in place until I have time to add a real worker tier type connector support in waspy TODO: Please get rid of this as soon as your able to """ def parse_rabbit_message(body, envelope, properties): return Response() class RabbitChannelMixIn: def __init__(self): self.channel = None self._channel_ready = asyncio.Event() self.channels = {} async def _bootstrap_channel(self, channel): raise NotImplementedError async def _handle_rabbit_error(self, exception): if type(exception) == aioamqp.ChannelClosed: if self._protocol and self._protocol.state not in (protocol.CLOSING, protocol.CLOSED): logger.warning("RabbitMQ channel closed... Creating new channel") self._channel_ready.clear() self.channel = None channel = await self._protocol.channel() await self._bootstrap_channel(channel) self._channel_ready.set() elif type(exception) == aioamqp.AmqpClosedConnection: logger.error("RabbitMQ connection closed") else: logger.error(f"Unknown exception occurred: {exception}") raise exception async def create_channel(self) -> aioamqp.channel.Channel: channel = await self._protocol.channel() self.channels[channel.channel_id] = channel return channel async def close_channel(self, channel: aioamqp.channel.Channel) -> None: del self.channels[channel.channel_id] await channel.close() async def disconnect(self): for channel in self.channels.values(): if channel.is_open: await channel.close() self.channels = {} if self._protocol and self._protocol.state != protocol.CLOSED: if self._protocol.state == protocol.CLOSING: await self._protocol.wait_closed() else: await self._protocol.close() if self._transport: self._transport.close() async def connect(self, loop=None): async def do_connect(): if self.channel and self.channel.is_open: return logger.warning('Establishing new connection') if self._protocol: await self.disconnect() if os.getenv('DEBUG', 'false') == 'true': # todo: should make this use config and logging, and not env vars print(dict(host=self.host, port=self.port, virtualhost=self.virtualhost, login=self.username, password='*******', ssl=self.ssl, verify_ssl=self.verify_ssl, heartbeat=self.heartbeat, on_error=self._handle_rabbit_error, loop=loop)) self._transport, self._protocol = await aioamqp.connect( host=self.host, port=self.port, virtualhost=self.virtualhost, login=self.username, password=self.password, ssl=self.ssl, verify_ssl=self.verify_ssl, heartbeat=self.heartbeat, on_error=self._handle_rabbit_error, loop=loop ) channel = await self._protocol.channel() await self._bootstrap_channel(channel) self._channel_ready.set() async def reconnect(): try: while not self._closing: await do_connect() if hasattr(self, 'listeners'): for listener in self.listeners: channel = await self.create_channel() await listener.set_channel(channel) await listener.start() await self._protocol.wait_closed() self._channel_ready.clear() self.channel = None await self.disconnect() finally: await self.disconnect() asyncio.ensure_future(reconnect()) class RabbitMQClientTransport(ClientTransportABC, RabbitChannelMixIn): _CLOSING_SENTINAL = object() def __init__(self, *, url=None, port=5672, virtualhost='/', username='guest', password='guest', ssl=False, verify_ssl=True, heartbeat=20): super().__init__() self._transport = None self._protocol = None self._response_futures = {} self.host = url self.port = port self.virtualhost = virtualhost self.username = username self.password = password self.ssl = ssl self.verify_ssl = verify_ssl self.response_queue_name = str(uuid.uuid1()).encode() self._consumer_tag = None self._closing = False self.channel = None self.heartbeat = heartbeat self._connected = False if not url: raise TypeError("RabbitMqClientTransport() missing 1 required keyword-only argument: 'url'") async def make_request(self, service: str, method: str, path: str, body: bytes = None, query: str = None, headers: dict = None, correlation_id: str = None, content_type: str = None, exchange: str = 'amq.topic', timeout: int = 30, mandatory: bool = False, **kwargs): if not self._connected: self._connected = True asyncio.ensure_future(self.connect()) await self._channel_ready.wait() if correlation_id is None: correlation_id = str(uuid.uuid4()) # need to use `?` to represent `.` in rabbit # since its not valid in a path, it should work correctly everywhere path = path.replace('.', '?') # now turn slashes into dots for rabbit style paths path = path.replace('/', '.').lstrip('.') if method != 'PUBLISH': path = f'{method.lower()}.' + path if headers is None: headers = {} if query: headers['x-wasp-query-string'] = query if not body: body = b'null' message_id = str(uuid.uuid4()) properties = { 'headers': headers, 'correlation_id': correlation_id, 'message_id': message_id, 'type': method, 'app_id': 'test', } if method != 'PUBLISH': properties['reply_to'] = self.response_queue_name properties['expiration']: str(timeout * 1000) if content_type: properties['content_type'] = content_type for i in range(3): # retry messages on closed channels print(i) try: await self.channel.basic_publish(exchange_name=exchange, routing_key=path, properties=properties, payload=body, mandatory=mandatory) except aioamqp.ChannelClosed as e: """ Usually this means that rabbitmq closed the connection, because something was bad, such as the exchange name, or something """ await self._handle_rabbit_error(e) if i == 2: raise else: break if method != 'PUBLISH': future = asyncio.Future() self._response_futures[message_id] = future return await future async def _bootstrap_channel(self, channel: Channel): if self.channel == channel: logger.warning("somehow the channels are the same on a bootstrap") if self.channel and self.channel.is_open: await self.channel.close() self.channel = channel await self.channel.queue_declare(queue_name=self.response_queue_name, durable=False, exclusive=False, auto_delete=True) self.channel.return_callback = self.handle_return try: self._consumer_tag = (await self.channel.basic_consume( self.handle_responses, queue_name=self.response_queue_name, no_ack=True)).get('consumer_tag') except aioamqp.SynchronizationError as e: logger.exception('Channel already consuming') raise async def handle_responses(self, channel, body, envelope, properties): future = self._response_futures.pop(properties.message_id) headers = properties.headers status = headers.pop('Status') response = Response(headers=headers, correlation_id=properties.correlation_id, body=body, status=int(status), content_type=properties.content_type) if not future.done(): future.set_result(response) async def handle_return(self, channel, body, envelope, properties): future = self._response_futures.pop(properties.message_id, None) if not future: logger.warning('Got a returned message with nowhere to send it') return if envelope.reply_code == 312: # no route future.set_exception(NotRoutableError()) else: logger.error(f'Got a return with an unknown reply code: ' f'{envelope.reply_code}') async def close(self): self._closing = True await self.disconnect() class RabbitMQTransport(TransportABC, RabbitChannelMixIn): def __init__(self, *, url, port=5672, queue='', virtualhost='/', username='guest', password='guest', ssl=False, verify_ssl=True, create_queue=True, use_acks=False, heartbeat=20): super().__init__() self.host = url self.port = port self.virtualhost = virtualhost self.queue = queue self.username = username self.password = password self.ssl = ssl self.verify_ssl = verify_ssl self.create_queue = create_queue self._use_acks = use_acks self._transport = None self._protocol = None self.channel = None self._app = None self._loop = None self._consumer_tag = None self._counter = 0 self._handler = None self._done_future = asyncio.Future() self._closing = False self._client = None self.heartbeat = heartbeat self._config = {} self.listeners = [] def get_client(self): if not self._client: # TODO: not ideal, the client/server should ideally share a channel # or at least a connection self._client = RabbitMQClientTransport( url=self.host, port=self.port, virtualhost=self.virtualhost, username=self.username, password=self.password, ssl=self.ssl, verify_ssl=self.verify_ssl) return self._client async def declare_exchange(self): pass async def declare_queue(self): pass async def bind_to_exchange(self, *, exchange, routing_key): await self.channel.queue_bind(exchange_name=exchange, queue_name=self.queue, routing_key=routing_key) async def register_router(self, router, exchange='amq.topic'): if not self.channel: # Something weird is going on here? return for topic in (parse_url_to_topic(*url) for url in router.urls): await self.bind_to_exchange(exchange=exchange, routing_key=topic) async def start(self, handler): print(f"-- Listening for rabbitmq messages on queue {self.queue} --") self._handler = handler await self._channel_ready.wait() # channel hasn't actually been bootstraped yet await self._bootstrap_channel(self.channel) try: await self._done_future except asyncio.CancelledError: pass # shutting down logger.warning("Shutting down rabbitmq transport") await self.channel.basic_cancel(self._consumer_tag) await self.close() while self._counter > 0: await asyncio.sleep(1) def listen(self, *, loop, config): loop.create_task(self.connect(loop=loop)) self._config = config async def setup(): await self._channel_ready.wait() if self.create_queue: await self.channel.queue_declare(queue_name=self.queue) loop.run_until_complete(setup()) async def close(self): self._closing = True if self._client: await self._client.close() await self.disconnect() async def handle_request(self, channel: Channel, body, envelope, properties, futurize=True): """ the 'futurize' param is simply because aioamqp doesnt send another job until this method returns (completes), so we ensure the future of ourselves and return immediately so we can handle many requests at a time. """ if futurize: asyncio.ensure_future( self.handle_request(channel, body, envelope, properties, futurize=False)) return self._counter += 1 headers = properties.headers or {} query = headers.pop('x-wasp-query-string', '').lstrip('?') correlation_id = properties.correlation_id message_id = properties.message_id reply_to = properties.reply_to route = envelope.routing_key method, path = route.split('.', 1) try: method = Methods(method.upper()) except ValueError: path = route method = 'POST' path = path.replace('.', '/') # need to use `?` to represent `.` in rabbit # since its not valid in a path, it should work correctly everywhere path = path.replace('?', '.') path = urllib.parse.unquote(path) request = Request( headers=headers, path=path, correlation_id=correlation_id, method=method, query_string=query, body=body, ) if properties.content_type: headers['content-type'] = properties.content_type request.content_type = properties.content_type if properties.content_encoding: headers['content-encoding'] = properties.content_encoding logger.debug('received incoming request via rabbitmq: %s', request) response = await self._handler(request) if response is None: # task got cancelled. Dont send a response. return if reply_to: response.headers['Status'] = str(response.status.value) payload = response.raw_body or b'null' properties = { 'correlation_id': response.correlation_id, 'headers': response.headers, 'content_type': response.content_type, 'message_id': message_id, 'expiration': '30000', } await self._channel_ready.wait() await channel.basic_publish(exchange_name='', payload=payload, routing_key=reply_to, properties=properties) if self._use_acks: await self.channel.basic_client_ack(delivery_tag=envelope.delivery_tag) self._counter -= 1 def shutdown(self): self._done_future.cancel() async def _bootstrap_channel(self, channel): self.channel = channel if self._handler is None: return await self.channel.basic_qos(prefetch_count=1) resp = await self.channel.basic_consume( self.handle_request, queue_name=self.queue, no_ack=not self._use_acks ) self._consumer_tag = resp.get('consumer_tag') def add_listener(self, listener: TransportListenerABC): self.listeners.append(listener) listener.set_transport(self) def parse_url_to_topic(method, route): """ Transforms a URL to a topic. `GET /bar/{id}` -> `get.bar.*` `POST /bar/{id}` -> `post.bar.*` `GET /foo/bar/{id}/baz` -? `get.foo.bar.*.baz` Possible gotchas `GET /foo/{id}` -> `get.foo.*` `GET /foo/{id}:action` -> `get.foo.*` However, once it hits the service the router will be able to distinguish the two requests. """ route = route.replace('.', '?') route = route.replace('/', '.').strip('.') topic = f'{method.value.lower()}.{route}' # need to replace `{id}` and `{id}:some_method` with just `*` return re.sub(r"\.\{[^\}]*\}[:\w\d_-]*", ".*", topic) PK!&ee!waspy/transports/testtransport.pyimport asyncio import json from .. import webtypes from .transportabc import TransportABC, ClientTransportABC class TestClientTransport(ClientTransportABC): async def make_request(self, service: str, method: str, path: str, body: bytes = None, query: str = None, headers: dict = None, correlation_id: str = None, content_type: str = None, **kwargs) -> webtypes.Response: pass class TestTransport(TransportABC): def __init__(self, *args, **kwargs): self.app = None self.loop = None self.handler = None def listen(self, *, loop, config): self.loop = loop def run_app(self, app): self.app = app app.transport = (self,) app.shutdown = lambda: '' app.run() # now patch app to have send_request methods as well def send_request_for_app(request): return self.send_request(request) async def send_async_request_for_app(request): return await self.send_async_request(request) app.send_request = send_request_for_app app.send_async_request = send_async_request_for_app def send_request(self, request): loop = asyncio.get_event_loop() return loop.run_until_complete(self.send_async_request(request)) async def send_async_request(self, request): if request.body and isinstance(request.body, dict): request.body = json.dumps(request.body) if isinstance(request.body, str): request.body = request.body.encode() response = await self.handler(request) response.body = response.body return response async def start(self, request_handler: callable): self.handler = request_handler def shutdown(self): pass def get_client(self): return TestClientTransport() PK!&ee!waspy/transports/testtransport.pyimport asyncio import json from .. import webtypes from .transportabc import TransportABC, ClientTransportABC class TestClientTransport(ClientTransportABC): async def make_request(self, service: str, method: str, path: str, body: bytes = None, query: str = None, headers: dict = None, correlation_id: str = None, content_type: str = None, **kwargs) -> webtypes.Response: pass class TestTransport(TransportABC): def __init__(self, *args, **kwargs): self.app = None self.loop = None self.handler = None def listen(self, *, loop, config): self.loop = loop def run_app(self, app): self.app = app app.transport = (self,) app.shutdown = lambda: '' app.run() # now patch app to have send_request methods as well def send_request_for_app(request): return self.send_request(request) async def send_async_request_for_app(request): return await self.send_async_request(request) app.send_request = send_request_for_app app.send_async_request = send_async_request_for_app def send_request(self, request): loop = asyncio.get_event_loop() return loop.run_until_complete(self.send_async_request(request)) async def send_async_request(self, request): if request.body and isinstance(request.body, dict): request.body = json.dumps(request.body) if isinstance(request.body, str): request.body = request.body.encode() response = await self.handler(request) response.body = response.body return response async def start(self, request_handler: callable): self.handler = request_handler def shutdown(self): pass def get_client(self): return TestClientTransport() PK!vYr waspy/transports/transportabc.py""" A place for abstract transport base classes """ from abc import ABC, abstractmethod from .. import webtypes class ClientTransportABC(ABC): """Abstract Base Class for implementing client transports""" @abstractmethod async def make_request(self, service: str, method: str, path: str, body: bytes=None, query: str=None, headers: dict=None, correlation_id: str=None, content_type: str=None, timeout:int = 30, **kwargs) -> webtypes.Response: """ Method for actually making a request :param service: service to make request too :param method: HTTP method: GET/PUT/POST etc. :param path: routing path. Should support dots `foo.2.bars` or slashes `foo/2/bars` :param body: request body. Bytes-like object :param query: query string. Example: `foo=bar&cabbage=green` :param headers: Dictionary of headers :param correlation_id: :param content_type: example: `application/json` :param timeout: time to wait for response in seconds before getting an asyncio.TimeoutError :param kwargs: Should except **kwargs for compatability for other possible options on other transports (for example, http might need a `port` option) :return: """ class PubSubTransportABC(ABC): """ Abstract Base Class for implementing pubsub client transports""" class TransportABC(ABC): """ Abstract Base Class for implementing server transports""" @abstractmethod def listen(self, *, loop, config): """This method is responsible for establishing a listening connection For example, in HTTP world this would mean acquiring a specific port, in a RabbitMQ world this would mean getting a connection and attaching to a queue. """ @abstractmethod async def start(self, request_handler: callable): """ This method does things needed before we run Like "listen" but you get the app object, and it happens after the fork. This is a corousine """ def get_client(self): raise NotImplementedError @abstractmethod def shutdown(self): """ Signals that we are shutting down """ class WorkerTransportABC(ABC): """ Abstract Base Class for implementing worker transports """ @abstractmethod def start(self): pass PK!vYr waspy/transports/transportabc.py""" A place for abstract transport base classes """ from abc import ABC, abstractmethod from .. import webtypes class ClientTransportABC(ABC): """Abstract Base Class for implementing client transports""" @abstractmethod async def make_request(self, service: str, method: str, path: str, body: bytes=None, query: str=None, headers: dict=None, correlation_id: str=None, content_type: str=None, timeout:int = 30, **kwargs) -> webtypes.Response: """ Method for actually making a request :param service: service to make request too :param method: HTTP method: GET/PUT/POST etc. :param path: routing path. Should support dots `foo.2.bars` or slashes `foo/2/bars` :param body: request body. Bytes-like object :param query: query string. Example: `foo=bar&cabbage=green` :param headers: Dictionary of headers :param correlation_id: :param content_type: example: `application/json` :param timeout: time to wait for response in seconds before getting an asyncio.TimeoutError :param kwargs: Should except **kwargs for compatability for other possible options on other transports (for example, http might need a `port` option) :return: """ class PubSubTransportABC(ABC): """ Abstract Base Class for implementing pubsub client transports""" class TransportABC(ABC): """ Abstract Base Class for implementing server transports""" @abstractmethod def listen(self, *, loop, config): """This method is responsible for establishing a listening connection For example, in HTTP world this would mean acquiring a specific port, in a RabbitMQ world this would mean getting a connection and attaching to a queue. """ @abstractmethod async def start(self, request_handler: callable): """ This method does things needed before we run Like "listen" but you get the app object, and it happens after the fork. This is a corousine """ def get_client(self): raise NotImplementedError @abstractmethod def shutdown(self): """ Signals that we are shutting down """ class WorkerTransportABC(ABC): """ Abstract Base Class for implementing worker transports """ @abstractmethod def start(self): pass PK!{jx!!waspy/webtypes.pyfrom collections import defaultdict from urllib import parse from http import HTTPStatus, cookies import uuid from aenum import extend_enum from waspy import exceptions from waspy.parser import parsers from .router import Methods extend_enum(HTTPStatus, 'INVALID_REQUEST', (430, 'Invalid Request', 'Request was syntactically sound, ' 'but failed validation rules')) class QueryParams: """ A dictionary that stores multiple values per key. this has all the normal dictionary methods, and works as normal but does not override a key when `add` is used, and also has `getall` """ __slots__ = ['mappings'] @classmethod def from_string(cls, string): query_params = QueryParams() qs = parse.parse_qsl(string) for k, v in qs: query_params.add(k, v) return query_params def __init__(self): self.mappings = defaultdict(list) def get(self, name, default=None): return self.mappings.get(name, [default])[0] def getall(self, name, default=None): return self.mappings.get(name, default) def __getitem__(self, key): try: return self.mappings.__getitem__(key)[0] except IndexError: raise KeyError('Invalid Key: {key}'.format(key)) def __setitem__(self, key, value): raise TypeError('MultiDict does not support item assignment. ' 'Use .add(k, v) instead.') def add(self, name, value): self.mappings[name].append(value) def __str__(self): return parse.urlencode(self.mappings, doseq=True) class Parseable: def __init__(self, *args, content_type=None, body=None, **kwargs): self._parser = None self.app = None self.original_body = body self._body = None self._raw_body = None self._content_type = None self.content_type = content_type self.ignore_content_type = kwargs.get('ignore_content_type', False) @property def content_type(self): if self._content_type: return self._content_type if self.app: return self.app.default_content_type @content_type.setter def content_type(self, value): if value: # Throw away charset for now. We will have to figure this out later. self._content_type = value.split(';')[0] @property def parser(self): if not self._parser: self._parser = parsers.get(self.content_type) if not self._parser and not self.ignore_content_type: raise exceptions.UnsupportedMediaType(self.content_type) return self._parser @property def body(self) -> dict: """ Decoded Body """ if self.ignore_content_type: return self.original_body if self._body is None and self.original_body is not None: if isinstance(self.original_body, bytes): self._body = self.parser.decode(self.original_body) else: self._body = self.original_body return self._body @body.setter def body(self, value): if isinstance(value, bytes): # Transports sometimes set the value after the response is created # Setting it to the original_body allows for lazy parsing self.original_body = value # Reset body and raw_body self._raw_body = None self._body = None elif isinstance(value, dict): self._body = value # Reset the raw_body self.original_body = value self._raw_body = None @property def raw_body(self) -> bytes: """ Encoded Body """ if self._raw_body is None and self.original_body is not None: if isinstance(self.original_body, dict): self._raw_body = self.parser.encode(self.original_body) if isinstance(self._raw_body, str): self._raw_body = self._raw_body.encode() elif isinstance(self.original_body, str): self._raw_body = self.original_body.encode() elif isinstance(self.original_body, bytes): self._raw_body = self.original_body else: self._raw_body = self.parser.encode(self.original_body) if isinstance(self._raw_body, str): self._raw_body = self._raw_body.encode() return self._raw_body def json(self) -> dict: """ Simply an alias now for getting the decoded body """ return self.body class Request(Parseable): def __init__(self, headers: dict = None, path: str = None, correlation_id: str = None, method: str = None, query_string: str = None, body: bytes=None, content_type=None): super().__init__( headers=headers, path=path, correlation_id=correlation_id, method=method, query_string=query_string, body=body, content_type=content_type) if not headers: headers = {} if not method: method = 'GET' self._method = None self.headers = headers self.path = path self.correlation_id = correlation_id or str(uuid.uuid4()) self.method = method # this is a property setter self.query_string = query_string self._query_params = None self.path_params = {} self._handler = None self.app = None self.content_type = content_type self._cookies = None @property def method(self): return self._method @method.setter def method(self, value): if isinstance(value, str): value = Methods(value.upper()) self._method = value @property def path_qs(self): # the path + the query string query = '?' + self.query_string if self.query_string else '' return self.path + query @property def cookies(self) -> dict: if self._cookies is None: self._cookies = {} raw = self.headers.get('cookie', None) if raw: cookie_manager = cookies.SimpleCookie(raw) self._cookies = {i: cookie_manager[i].value for i in cookie_manager} return self._cookies @property def query(self) -> QueryParams: # parse query string into a dictionary if not self._query_params: self._query_params = QueryParams.from_string(self.query_string) return self._query_params def __str__(self): query = '?' + self.query_string if self.query_string else '' return('' .format(method=self.method, path=self.path, query=query, id=id(self))) def __repr__(self): return (f'Request(headers={repr(self.headers)}, path={repr(self.path)}, ' f'correlation_id={self.correlation_id}, method={repr(self.method)}, ' f'query_string={self.query_string}, body={self.body}, ' f'content_type={self.content_type})') class Response(Parseable): def __init__(self, headers=None, correlation_id=None, body=None, status=HTTPStatus.OK, content_type=None, meta: dict=None): """ Response object :param headers: :param correlation_id: :param body: message body :param status: status code :param content_type: :param meta: Extra context information. Not to be returned through transport """ super().__init__( headers=headers, correlation_id=correlation_id, body=body, status=status, content_type=content_type, meta=meta) if not headers: headers = dict() if isinstance(status, int): # convert to enum status = HTTPStatus(status) if meta is None: meta = {} self.headers = headers self.correlation_id = correlation_id self.original_body = body self.status = status self.meta = meta self.app = None self._body = None self._raw_body = None def __str__(self): return('' .format(status=self.status, id=id(self))) PK!{jx!!waspy/webtypes.pyfrom collections import defaultdict from urllib import parse from http import HTTPStatus, cookies import uuid from aenum import extend_enum from waspy import exceptions from waspy.parser import parsers from .router import Methods extend_enum(HTTPStatus, 'INVALID_REQUEST', (430, 'Invalid Request', 'Request was syntactically sound, ' 'but failed validation rules')) class QueryParams: """ A dictionary that stores multiple values per key. this has all the normal dictionary methods, and works as normal but does not override a key when `add` is used, and also has `getall` """ __slots__ = ['mappings'] @classmethod def from_string(cls, string): query_params = QueryParams() qs = parse.parse_qsl(string) for k, v in qs: query_params.add(k, v) return query_params def __init__(self): self.mappings = defaultdict(list) def get(self, name, default=None): return self.mappings.get(name, [default])[0] def getall(self, name, default=None): return self.mappings.get(name, default) def __getitem__(self, key): try: return self.mappings.__getitem__(key)[0] except IndexError: raise KeyError('Invalid Key: {key}'.format(key)) def __setitem__(self, key, value): raise TypeError('MultiDict does not support item assignment. ' 'Use .add(k, v) instead.') def add(self, name, value): self.mappings[name].append(value) def __str__(self): return parse.urlencode(self.mappings, doseq=True) class Parseable: def __init__(self, *args, content_type=None, body=None, **kwargs): self._parser = None self.app = None self.original_body = body self._body = None self._raw_body = None self._content_type = None self.content_type = content_type self.ignore_content_type = kwargs.get('ignore_content_type', False) @property def content_type(self): if self._content_type: return self._content_type if self.app: return self.app.default_content_type @content_type.setter def content_type(self, value): if value: # Throw away charset for now. We will have to figure this out later. self._content_type = value.split(';')[0] @property def parser(self): if not self._parser: self._parser = parsers.get(self.content_type) if not self._parser and not self.ignore_content_type: raise exceptions.UnsupportedMediaType(self.content_type) return self._parser @property def body(self) -> dict: """ Decoded Body """ if self.ignore_content_type: return self.original_body if self._body is None and self.original_body is not None: if isinstance(self.original_body, bytes): self._body = self.parser.decode(self.original_body) else: self._body = self.original_body return self._body @body.setter def body(self, value): if isinstance(value, bytes): # Transports sometimes set the value after the response is created # Setting it to the original_body allows for lazy parsing self.original_body = value # Reset body and raw_body self._raw_body = None self._body = None elif isinstance(value, dict): self._body = value # Reset the raw_body self.original_body = value self._raw_body = None @property def raw_body(self) -> bytes: """ Encoded Body """ if self._raw_body is None and self.original_body is not None: if isinstance(self.original_body, dict): self._raw_body = self.parser.encode(self.original_body) if isinstance(self._raw_body, str): self._raw_body = self._raw_body.encode() elif isinstance(self.original_body, str): self._raw_body = self.original_body.encode() elif isinstance(self.original_body, bytes): self._raw_body = self.original_body else: self._raw_body = self.parser.encode(self.original_body) if isinstance(self._raw_body, str): self._raw_body = self._raw_body.encode() return self._raw_body def json(self) -> dict: """ Simply an alias now for getting the decoded body """ return self.body class Request(Parseable): def __init__(self, headers: dict = None, path: str = None, correlation_id: str = None, method: str = None, query_string: str = None, body: bytes=None, content_type=None): super().__init__( headers=headers, path=path, correlation_id=correlation_id, method=method, query_string=query_string, body=body, content_type=content_type) if not headers: headers = {} if not method: method = 'GET' self._method = None self.headers = headers self.path = path self.correlation_id = correlation_id or str(uuid.uuid4()) self.method = method # this is a property setter self.query_string = query_string self._query_params = None self.path_params = {} self._handler = None self.app = None self.content_type = content_type self._cookies = None @property def method(self): return self._method @method.setter def method(self, value): if isinstance(value, str): value = Methods(value.upper()) self._method = value @property def path_qs(self): # the path + the query string query = '?' + self.query_string if self.query_string else '' return self.path + query @property def cookies(self) -> dict: if self._cookies is None: self._cookies = {} raw = self.headers.get('cookie', None) if raw: cookie_manager = cookies.SimpleCookie(raw) self._cookies = {i: cookie_manager[i].value for i in cookie_manager} return self._cookies @property def query(self) -> QueryParams: # parse query string into a dictionary if not self._query_params: self._query_params = QueryParams.from_string(self.query_string) return self._query_params def __str__(self): query = '?' + self.query_string if self.query_string else '' return('' .format(method=self.method, path=self.path, query=query, id=id(self))) def __repr__(self): return (f'Request(headers={repr(self.headers)}, path={repr(self.path)}, ' f'correlation_id={self.correlation_id}, method={repr(self.method)}, ' f'query_string={self.query_string}, body={self.body}, ' f'content_type={self.content_type})') class Response(Parseable): def __init__(self, headers=None, correlation_id=None, body=None, status=HTTPStatus.OK, content_type=None, meta: dict=None): """ Response object :param headers: :param correlation_id: :param body: message body :param status: status code :param content_type: :param meta: Extra context information. Not to be returned through transport """ super().__init__( headers=headers, correlation_id=correlation_id, body=body, status=status, content_type=content_type, meta=meta) if not headers: headers = dict() if isinstance(status, int): # convert to enum status = HTTPStatus(status) if meta is None: meta = {} self.headers = headers self.correlation_id = correlation_id self.original_body = body self.status = status self.meta = meta self.app = None self._body = None self._raw_body = None def __str__(self): return('' .format(status=self.status, id=id(self))) PK!Wڜmwaspy/worker.pyfrom typing import Union, List, Iterable from .transports.transportabc import WorkerTransportABC from .transports.rabbitmqtransport import RabbitMQWorkerTransport class Task: pass class Worker: def __init__(self, transport: Union[WorkerTransportABC, Iterable[WorkerTransportABC]]): self.transport = transport def run(self): if self.transport is None: pass PK!Wڜmwaspy/worker.pyfrom typing import Union, List, Iterable from .transports.transportabc import WorkerTransportABC from .transports.rabbitmqtransport import RabbitMQWorkerTransport class Task: pass class Worker: def __init__(self, transport: Union[WorkerTransportABC, Iterable[WorkerTransportABC]]): self.transport = transport def run(self): if self.transport is None: pass PK!;U],],waspy-0.44.1.dist-info/LICENSE Apache License Version 2.0, January 2004 http://www.apache.org/licenses/ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION 1. Definitions. "License" shall mean the terms and conditions for use, reproduction, and distribution as defined by Sections 1 through 9 of this document. "Licensor" shall mean the copyright owner or entity authorized by the copyright owner that is granting the License. "Legal Entity" shall mean the union of the acting entity and all other entities that control, are controlled by, or are under common control with that entity. For the purposes of this definition, "control" means (i) the power, direct or indirect, to cause the direction or management of such entity, whether by contract or otherwise, or (ii) ownership of fifty percent (50%) or more of the outstanding shares, or (iii) beneficial ownership of such entity. "You" (or "Your") shall mean an individual or Legal Entity exercising permissions granted by this License. "Source" form shall mean the preferred form for making modifications, including but not limited to software source code, documentation source, and configuration files. "Object" form shall mean any form resulting from mechanical transformation or translation of a Source form, including but not limited to compiled object code, generated documentation, and conversions to other media types. "Work" shall mean the work of authorship, whether in Source or Object form, made available under the License, as indicated by a copyright notice that is included in or attached to the work (an example is provided in the Appendix below). "Derivative Works" shall mean any work, whether in Source or Object form, that is based on (or derived from) the Work and for which the editorial revisions, annotations, elaborations, or other modifications represent, as a whole, an original work of authorship. For the purposes of this License, Derivative Works shall not include works that remain separable from, or merely link (or bind by name) to the interfaces of, the Work and Derivative Works thereof. "Contribution" shall mean any work of authorship, including the original version of the Work and any modifications or additions to that Work or Derivative Works thereof, that is intentionally submitted to Licensor for inclusion in the Work by the copyright owner or by an individual or Legal Entity authorized to submit on behalf of the copyright owner. For the purposes of this definition, "submitted" means any form of electronic, verbal, or written communication sent to the Licensor or its representatives, including but not limited to communication on electronic mailing lists, source code control systems, and issue tracking systems that are managed by, or on behalf of, the Licensor for the purpose of discussing and improving the Work, but excluding communication that is conspicuously marked or otherwise designated in writing by the copyright owner as "Not a Contribution." "Contributor" shall mean Licensor and any individual or Legal Entity on behalf of whom a Contribution has been received by Licensor and subsequently incorporated within the Work. 2. Grant of Copyright License. Subject to the terms and conditions of this License, each Contributor hereby grants to You a perpetual, worldwide, non-exclusive, no-charge, royalty-free, irrevocable copyright license to reproduce, prepare Derivative Works of, publicly display, publicly perform, sublicense, and distribute the Work and such Derivative Works in Source or Object form. 3. Grant of Patent License. Subject to the terms and conditions of this License, each Contributor hereby grants to You a perpetual, worldwide, non-exclusive, no-charge, royalty-free, irrevocable (except as stated in this section) patent license to make, have made, use, offer to sell, sell, import, and otherwise transfer the Work, where such license applies only to those patent claims licensable by such Contributor that are necessarily infringed by their Contribution(s) alone or by combination of their Contribution(s) with the Work to which such Contribution(s) was submitted. If You institute patent litigation against any entity (including a cross-claim or counterclaim in a lawsuit) alleging that the Work or a Contribution incorporated within the Work constitutes direct or contributory patent infringement, then any patent licenses granted to You under this License for that Work shall terminate as of the date such litigation is filed. 4. Redistribution. You may reproduce and distribute copies of the Work or Derivative Works thereof in any medium, with or without modifications, and in Source or Object form, provided that You meet the following conditions: (a) You must give any other recipients of the Work or Derivative Works a copy of this License; and (b) You must cause any modified files to carry prominent notices stating that You changed the files; and (c) You must retain, in the Source form of any Derivative Works that You distribute, all copyright, patent, trademark, and attribution notices from the Source form of the Work, excluding those notices that do not pertain to any part of the Derivative Works; and (d) If the Work includes a "NOTICE" text file as part of its distribution, then any Derivative Works that You distribute must include a readable copy of the attribution notices contained within such NOTICE file, excluding those notices that do not pertain to any part of the Derivative Works, in at least one of the following places: within a NOTICE text file distributed as part of the Derivative Works; within the Source form or documentation, if provided along with the Derivative Works; or, within a display generated by the Derivative Works, if and wherever such third-party notices normally appear. The contents of the NOTICE file are for informational purposes only and do not modify the License. You may add Your own attribution notices within Derivative Works that You distribute, alongside or as an addendum to the NOTICE text from the Work, provided that such additional attribution notices cannot be construed as modifying the License. You may add Your own copyright statement to Your modifications and may provide additional or different license terms and conditions for use, reproduction, or distribution of Your modifications, or for any such Derivative Works as a whole, provided Your use, reproduction, and distribution of the Work otherwise complies with the conditions stated in this License. 5. Submission of Contributions. Unless You explicitly state otherwise, any Contribution intentionally submitted for inclusion in the Work by You to the Licensor shall be under the terms and conditions of this License, without any additional terms or conditions. Notwithstanding the above, nothing herein shall supersede or modify the terms of any separate license agreement you may have executed with Licensor regarding such Contributions. 6. Trademarks. This License does not grant permission to use the trade names, trademarks, service marks, or product names of the Licensor, except as required for reasonable and customary use in describing the origin of the Work and reproducing the content of the NOTICE file. 7. Disclaimer of Warranty. Unless required by applicable law or agreed to in writing, Licensor provides the Work (and each Contributor provides its Contributions) on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied, including, without limitation, any warranties or conditions of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A PARTICULAR PURPOSE. You are solely responsible for determining the appropriateness of using or redistributing the Work and assume any risks associated with Your exercise of permissions under this License. 8. Limitation of Liability. In no event and under no legal theory, whether in tort (including negligence), contract, or otherwise, unless required by applicable law (such as deliberate and grossly negligent acts) or agreed to in writing, shall any Contributor be liable to You for damages, including any direct, indirect, special, incidental, or consequential damages of any character arising as a result of this License or out of the use or inability to use the Work (including but not limited to damages for loss of goodwill, work stoppage, computer failure or malfunction, or any and all other commercial damages or losses), even if such Contributor has been advised of the possibility of such damages. 9. Accepting Warranty or Additional Liability. While redistributing the Work or Derivative Works thereof, You may choose to offer, and charge a fee for, acceptance of support, warranty, indemnity, or other liability obligations and/or rights consistent with this License. However, in accepting such obligations, You may act only on Your own behalf and on Your sole responsibility, not on behalf of any other Contributor, and only if You agree to indemnify, defend, and hold each Contributor harmless for any liability incurred by, or claims asserted against, such Contributor by reason of your accepting any such warranty or additional liability. END OF TERMS AND CONDITIONS APPENDIX: How to apply the Apache License to your work. To apply the Apache License to your work, attach the following boilerplate notice, with the fields enclosed by brackets "{}" replaced with your own identifying information. (Don't include the brackets!) The text should be enclosed in the appropriate comment syntax for the file format. We also recommend that a file or class name and description of purpose be included on the same "printed page" as the copyright notice for easier identification within third-party archives. Copyright {yyyy} {name of copyright owner} Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. PK!HW"TTwaspy-0.44.1.dist-info/WHEEL A н#J."jm)Afb~ ڡ5 G7hiޅF4+-3ڦ/̖?XPK!H DJ.z waspy-0.44.1.dist-info/METADATAVmo6_qK?,,/ 0#mI(Z%=GIvDsZz /u N^ED%'nw󦪄m'4uNZ8i7*-4Ʈ Sɸ9n22IM5fشi!la$s] #eFV:OȊR~T=$60vBT*-XVBEwg*[/8F7 NY i$ǻ7Ω^ndiJjOs/|h27Ӵ q {ԙhdJTho%`@u2@Ds[a z XQҕyYKp,9[er38@jfM+{l&$哷{+驐ݻWiOʈ~zdƔ.z/]jUAp|hk0?י(zA[&$)G ];p E!S9v$~ T-V򎄦@m@#ov>6Ϋr A/R.-T)"Gfe&V;jhth2b>h0WӐ+LSf%{0S> ejb"p ttK2ei!0w$8sv #ivdZxVy, gU9tٸ(goЕ> ɣW5@#-H>~W@&dl20, e@sh]-?wZ𪒬D^6󇛠s^@EO$* CZ#-3 Ԭs1Q< %E(`f4rey}9HW.Z^xJ tL"n1a0Tlv_6p#CVl O> Ct٢ t'-gM9fg%aah[;t^nψG/0>:5`F]ḣ]5woQ'}S'`A?PK!HmKwaspy-0.44.1.dist-info/RECORDrH}%,"gDl("()ߝ5wm9E[c#8!y=;)sĨbV[KaӶo> 0k}1t$N\0Dk6&=͞GgRF2q?! LȦCAN"' *:9(%}P,@J$i;=9mY0u6:#mS1\0^tmVOe>q#i!Pو7b_/q}9TG¹YTP.n=GkPv|$yx]+ ^H .6Pvɐ7 L}IXB-N h$ yJE5((v럔~@F STRŘgb\&K9NiW{+Jb0rqMg+׵^C-3C|:Ss?=ɻan]GqA#/W8N F0:ُePgi۱CSܗlԲI|d=[w4A3jQO00OS3CĘJ c٩г7<-N PZfY\ s\~,t*D?b#/:X#\jY\;g= Xǹf1M|^gŒOc.1UY1B7(L?J;QOI>$:d-!o{ܺPS݅&p~J`M2@84ˎZQSĕg` tQʤr nbR*tYԍMHM1+(?~>{m# +C!N9'YlwL}3sO𚞦uQYE^8;Hnk-E!@/ԉH-WjY֖i.wƨҪ`8R0 ~;?2l*~Չי3 7;,2{uN_ 9<@a'"ǩj-ޑŶ: V{Ħ(qβP2}۲ ǥ9̞qrV-*Q.*ڤf=N^k{7PK!waspy/__init__.pyPK!waspy/__init__.pyPK! QR R waspy/_cors.pyPK! QR R  waspy/_cors.pyPK!ea&& waspy/app.pyPK!ea&& &<waspy/app.pyPK!NLcwaspy/client.pyPK!N+twaspy/client.pyPK!w waspy/configuration.pyPK!wwaspy/configuration.pyPK!TT Ƣwaspy/ctx.pyPK!TT Dwaspy/ctx.pyPK!HNN N £waspy/errorlogging.pyPK!HNN N Cwaspy/errorlogging.pyPK!V+~~ļwaspy/exceptions.pyPK!V+~~swaspy/exceptions.pyPK!"waspy/listeners/__init__.pyPK!aspy/listeners/__init__.pyPK!ZՍn n $waspy/listeners/rabbitmq_listener.pyPK!ZՍn n $Dwaspy/listeners/rabbitmq_listener.pyPK!)waspy/listeners/transport_listener_abc.pyPK!)@waspy/listeners/transport_listener_abc.pyPK!xi""waspy/parser.pyPK!xi""waspy/parser.pyPK!l!!*waspy/router.pyPK!l!!.waspy/router.pyPK!<_Q23waspy/transports/__init__.pyPK!<_Q4waspy/transports/__init__.pyPK!ׇ33!5waspy/transports/httptransport.pyPK!ׇ33!iwaspy/transports/httptransport.pyPK!s"nwaspy/transports/rabbit_patches.pyPK!s"waspy/transports/rabbit_patches.pyPK!Z(G@GG%waspy/transports/rabbitmqtransport.pyPK!Z(G@GG%owaspy/transports/rabbitmqtransport.pyPK!&ee!2Ywaspy/transports/testtransport.pyPK!&ee!`waspy/transports/testtransport.pyPK!vYr zhwaspy/transports/transportabc.pyPK!vYr rwaspy/transports/transportabc.pyPK!{jx!!|waspy/webtypes.pyPK!{jx!!waspy/webtypes.pyPK!Wڜm.waspy/worker.pyPK!Wڜmwaspy/worker.pyPK!;U],],waspy-0.44.1.dist-info/LICENSEPK!HW"TTwaspy-0.44.1.dist-info/WHEELPK!H DJ.z 7waspy-0.44.1.dist-info/METADATAPK!HmKwaspy-0.44.1.dist-info/RECORDPK.. z