PKrN*tllhappyly/__init__.py"""Conveniently separate your business logic from messaging stuff.""" # flake8: noqa F401 import logging __version__ = '0.9.0' from .listening import Executor, BaseListener from .schemas import Schema from .caching import Cacher from .serialization import Serializer, Deserializer from .handling import Handler, DUMMY_HANDLER from .exceptions import StopPipeline def _welcome(): import sys sys.stdout.write(f'Using happyly v{__version__}.\n') def _setup_warnings(): import warnings for warning_type in PendingDeprecationWarning, DeprecationWarning: warnings.filterwarnings( 'always', category=warning_type, module=r'^{0}\.'.format(__name__) ) def _setup_logging(): logging.getLogger(__name__).setLevel(logging.INFO) _welcome() _setup_warnings() _setup_logging() del _welcome del _setup_warnings del _setup_logging PK Nh happyly/exceptions.pyfrom attr import attrs @attrs(auto_attribs=True, auto_exc=True) # type: ignore class StopPipeline(Exception): """ This exception should be raised to stop a pipeline. After raising it, :meth:`Executor.on_stopped` will be called. """ reason: str = '' @attrs(auto_exc=True) # type: ignore class FetchedNoResult(Exception): """ Exception thrown by :meth:`Executor.run_for_result` when it is unable to fetch a result """ pass PK N!happyly/_deprecations/__init__.pyPKNl((happyly/_deprecations/utils.pyimport warnings from typing import Type, Union def will_be_removed( deprecated_name: str, use_instead: Union[str, Type], removing_in_version: str, stacklevel=2, ): new_class_name = ( use_instead.__name__ # type: ignore if isinstance(use_instead, Type) # type: ignore else use_instead ) warnings.warn( f"Please use {new_class_name} instead, " f"{deprecated_name} will be removed in happyly v{removing_in_version}.", DeprecationWarning, stacklevel=stacklevel, ) PKNݓAffhappyly/caching/__init__.pyfrom .cacher import Cacher # noqa: F401 # this is a deprecated module # it will be removed in v0.11 PK N6yyhappyly/caching/cacher.pyfrom abc import ABC, abstractmethod from typing import Any _no_default_impl = NotImplementedError('No default implementation for class Cacher') class Cacher(ABC): """ Abstract base class which defines interface of any caching component to be used via :class:`.CacheByRequestIdMixin` or similar mixin. """ @abstractmethod def add(self, data: Any, key: str): """ Add the provided data to cache and store it by the provided key. """ raise _no_default_impl @abstractmethod def remove(self, key: str): """ Remove data from cache which is stored by the provided key. """ raise _no_default_impl @abstractmethod def get(self, key: str): """ Returns data which is stored in cache by the provided key. """ raise _no_default_impl PKN55happyly/caching/mixins.pyimport json import warnings from typing import Any, Mapping, Optional from happyly.caching.cacher import Cacher class CacheByRequestIdMixin: """ Mixin which adds caching functionality to Listener. Utilizes notions of listener's topic and request id of message -- otherwise will not work. To be used via multiple inheritance. For example, given some component `SomeListener` you can define its caching equivalent by defining `SomeCachedListener` which inherits from both `SomeListener` and :class:`.CacheByRequestIdMixin`. """ def __init__(self, cacher: Cacher): warnings.warn( 'CacheByRequestIdMixin will be removed in happyly v0.11.0', DeprecationWarning, ) self.cacher = cacher def on_received(self, message: Any): super().on_received(message) try: req_id = self._get_req_id(message) except Exception: pass else: data = json.dumps( {'topic': self.from_topic, 'data': json.loads(message.data)} ) self.cacher.add(data, key=req_id) def _get_req_id(self, message: Any) -> str: assert self.deserializer is not None attribtues = self.deserializer.deserialize(message) return attribtues[self.deserializer.request_id_field] def _rm(self, parsed_message: Mapping[str, Any]): assert self.deserializer is not None self.cacher.remove(parsed_message[self.deserializer.request_id_field]) def on_published( self, original_message: Any, parsed_message: Optional[Mapping[str, Any]], result ): super().on_published(original_message, parsed_message, result) if parsed_message is not None: self._rm(parsed_message) def on_deserialization_failed(self, message: Any, error: Exception): super().on_deserialization_failed(message, error) try: req_id = self._get_req_id(message) except Exception: pass else: self.cacher.remove(key=req_id) PKNt=M!happyly/google_pubsub/__init__.py# flake8: noqa F401 from .high_level import ( GoogleSimpleSender, GoogleCachedReceiveAndReply, GoogleCachedReceiver, GoogleLateAckReceiver, GoogleLateAckReceiveAndReply, GoogleBaseReceiver, GoogleBaseReceiveAndReply, ) from .redis_cacher import RedisCacher from .deserializers import JSONDeserializerWithRequestIdRequired from .publishers import GooglePubSubPublisher from .subscribers import GooglePubSubSubscriber PK Ndj&happyly/google_pubsub/deserializers.pyfrom typing import Mapping, Any import json from attr import attrs import marshmallow from happyly.serialization import Deserializer @attrs(auto_attribs=True, frozen=True) class JSONDeserializerWithRequestIdRequired(Deserializer): """ Deserializer for Google Pub/Sub messages which expects a message of certain schema to be written in `message.data` as JSON encoded into binary data with utf-8. Schema used with this serializer must define some field which is used as request id (you can specify which one in constructor). If `JSONDeserializerWithRequestIdRequired` fails to deserialize some message, you can use `build_error_result` to fetch request id and provide error message. """ schema: marshmallow.Schema request_id_field: str = 'request_id' status_field: str = 'status' error_field: str = 'error' _status_error: str = 'ERROR' def deserialize(self, message: Any) -> Mapping[str, Any]: """ Loads message attributes from `message.data`, expects it to be a JSON which corresponds `self.schema` encoded with utf-8. """ data = message.data.decode('utf-8') deserialized, _ = self.schema.loads(data) return deserialized def build_error_result(self, message: Any, error: Exception) -> Mapping[str, Any]: """ Provides a fallback result when `deserialize` fails. Returns a dict with attributes: * * * Field names can be specified in constructor. If request id cannot be fetched, it is set to an empty string. """ attributes = json.loads(message.data) try: return { self.request_id_field: attributes[self.request_id_field], self.status_field: self._status_error, self.error_field: repr(error), } except KeyError as e: return { self.request_id_field: '', self.status_field: self._status_error, self.error_field: f'{repr(e)}: ' f'Message contains no {self.request_id_field}', } PKN۪[#happyly/google_pubsub/publishers.pyfrom typing import Any from happyly.pubsub import BasePublisher class GooglePubSubPublisher(BasePublisher): """ Publisher for Google Pub/Sub. Synchronously publishes the provided message to given topic. """ def __init__(self, project: str, to_topic: str): try: from google.cloud import pubsub_v1 except ImportError: raise ImportError( 'Please install google-cloud-pubsub to use this component' ) super().__init__() self.project = project self.to_topic = to_topic self._publisher_client = pubsub_v1.PublisherClient() def publish(self, serialized_message: Any): future = self._publisher_client.publish( f'projects/{self.project}/topics/{self.to_topic}', serialized_message ) try: future.result() return except Exception as e: raise e PKN9 n%happyly/google_pubsub/redis_cacher.pyimport logging import warnings from happyly.caching.cacher import Cacher _LOGGER = logging.getLogger(__name__) class RedisCacher(Cacher): def __init__(self, host: str, port: int, prefix: str = ''): warnings.warn( 'RedisCacher will be removed in happyly v0.11.0', DeprecationWarning ) try: import redis except ImportError as e: raise ImportError('Please install redis>=3.0 to use this feature.') from e self.prefix = prefix self.client = redis.StrictRedis(host=host, port=port) _LOGGER.info( f'Cache was successfully initialized with Redis client ({host}:{port})' ) if self.prefix != '': _LOGGER.info(f'Using prefix {self.prefix}') def add(self, data: str, key: str): self.client.hset(self.prefix, key, data) _LOGGER.info(f'Cached message with id {key}') def remove(self, key: str): self.client.hdel(self.prefix, key) _LOGGER.info(f'Message with id {key} was removed from cache') def get(self, key: str): self.client.hget(self.prefix, key) def get_all(self): keys = self.client.hkeys(self.prefix) return [self.client.hget(self.prefix, k) for k in keys] PKND $happyly/google_pubsub/subscribers.pyimport logging from typing import Callable, Any from happyly.pubsub import SubscriberWithAck _LOGGER = logging.getLogger(__name__) class GooglePubSubSubscriber(SubscriberWithAck): def __init__(self, project: str, subscription_name: str): try: from google.cloud import pubsub_v1 except ImportError: raise ImportError( 'Please install google-cloud-pubsub to use this component' ) super().__init__() self.project = project self.subscription_name = subscription_name s = pubsub_v1.SubscriberClient() self._subscription_path = s.subscription_path( self.project, self.subscription_name ) self._subscription_client = s def subscribe(self, callback: Callable[[Any], Any]): _LOGGER.info(f'Starting to listen to {self.subscription_name}') return self._subscription_client.subscribe(self._subscription_path, callback) def ack(self, message): message.ack() PKN@__,happyly/google_pubsub/high_level/__init__.py# flake8: noqa F401 from .simple import GoogleSimpleSender from .with_cache import GoogleCachedReceiveAndReply, GoogleCachedReceiver from .late_ack import GoogleLateAckReceiver, GoogleLateAckReceiveAndReply from .early_ack import GoogleEarlyAckReceiver, GoogleEarlyAckReceiveAndReply from .base import GoogleBaseReceiver, GoogleBaseReceiveAndReply PKN(happyly/google_pubsub/high_level/base.pyimport logging from typing import Optional, Union, Any, Mapping import marshmallow from happyly._deprecations.utils import will_be_removed from happyly.logs.request_id import RequestIdLogger from happyly.serialization import DUMMY_SERDE from happyly.serialization.json import BinaryJSONSerializerForSchema from ..subscribers import GooglePubSubSubscriber from ..deserializers import JSONDeserializerWithRequestIdRequired from ..publishers import GooglePubSubPublisher from happyly import Handler, Serializer from happyly.listening.listener import ExecutorWithAck _LOGGER = logging.getLogger(__name__) def _format_message(message): return f'data: {message.data}, attributes: {message.attributes}' class GooglePubSubExecutorWithRequestId( ExecutorWithAck[ JSONDeserializerWithRequestIdRequired, Union[None, GooglePubSubPublisher], Serializer, ] ): """ ExecutorWithAck subtype which adds advanced logging based on topic and request id. """ def __init__( self, subscriber: GooglePubSubSubscriber, handler: Handler, deserializer: JSONDeserializerWithRequestIdRequired, serializer: BinaryJSONSerializerForSchema = None, publisher: Optional[GooglePubSubPublisher] = None, from_topic: str = '', ): self.from_topic = from_topic super().__init__( subscriber=subscriber, publisher=publisher, handler=handler, deserializer=deserializer, serializer=serializer if serializer is not None else DUMMY_SERDE, ) def on_received(self, original_message: Any): logger = RequestIdLogger(_LOGGER, self.from_topic) logger.info(f"Received message: {_format_message(original_message)}") def on_deserialized( self, original_message: Any, deserialized_message: Mapping[str, Any] ): assert self.deserializer is not None request_id = deserialized_message[self.deserializer.request_id_field] logger = RequestIdLogger(_LOGGER, self.from_topic, request_id) logger.debug( f"Message successfully deserialized into attributes: {deserialized_message}" ) def on_deserialization_failed(self, original_message: Any, error: Exception): logger = RequestIdLogger(_LOGGER, self.from_topic) logger.exception( f"Was not able to deserialize the following message: " f"{_format_message(original_message)}" ) def on_handled( self, original_message: Any, deserialized_message: Mapping[str, Any], result ): assert self.deserializer is not None request_id = deserialized_message[self.deserializer.request_id_field] logger = RequestIdLogger(_LOGGER, self.from_topic, request_id) logger.info(f"Message handled, result {result}") def on_handling_failed( self, original_message: Any, deserialized_message: Mapping[str, Any], error: Exception, ): assert self.deserializer is not None request_id = deserialized_message[self.deserializer.request_id_field] logger = RequestIdLogger(_LOGGER, self.from_topic, request_id) logger.info(f'Failed to handle message, error {error}') def on_published( self, original_message: Any, deserialized_message: Optional[Mapping[str, Any]], result, serialized_message, ): assert self.deserializer is not None request_id = '' if deserialized_message is not None: request_id = deserialized_message[self.deserializer.request_id_field] logger = RequestIdLogger(_LOGGER, self.from_topic, request_id) logger.info(f"Published serialized result: {serialized_message}") def on_publishing_failed( self, original_message: Any, deserialized_message: Optional[Mapping[str, Any]], result, serialized_message, error: Exception, ): assert self.deserializer is not None request_id = '' if deserialized_message is not None: request_id = deserialized_message[self.deserializer.request_id_field] logger = RequestIdLogger(_LOGGER, self.from_topic, request_id) logger.exception(f"Failed to publish result: {serialized_message}") def on_acknowledged(self, message: Any): assert self.deserializer is not None try: msg: Mapping = self.deserializer.deserialize(message) req_id = msg[self.deserializer.request_id_field] except Exception: req_id = '' logger = RequestIdLogger(_LOGGER, self.from_topic, req_id) logger.info('Message acknowledged.') def on_finished(self, original_message: Any, error: Optional[Exception]): assert self.deserializer is not None try: msg: Mapping = self.deserializer.deserialize(original_message) req_id = msg[self.deserializer.request_id_field] except Exception: req_id = '' logger = RequestIdLogger(_LOGGER, self.from_topic, req_id) logger.info('Pipeline execution finished.') def on_stopped(self, original_message: Any, reason: str = ''): assert self.deserializer is not None try: msg: Mapping = self.deserializer.deserialize(original_message) req_id = msg[self.deserializer.request_id_field] except Exception: req_id = '' logger = RequestIdLogger(_LOGGER, self.from_topic, req_id) s = "." if reason == "" else f" due to the reason: {reason}." logger.info(f'Stopped pipeline{s}') class _BaseGoogleListenerWithRequestIdLogger(GooglePubSubExecutorWithRequestId): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) will_be_removed( '_BaseGoogleListenerWithRequestIdLogger', 'ExecutorWithAck or GooglePubSubExecutorWithRequestId', '0.11.0', ) class GoogleBaseReceiver(_BaseGoogleListenerWithRequestIdLogger): def __init__( self, input_schema: marshmallow.Schema, from_subscription: str, project: str, handler: Handler, from_topic: str = '', ): will_be_removed( 'GoogleBaseReceiver', 'ExecutorWithAck or GooglePubSubExecutorWithRequestId', '0.11.0', ) subscriber = GooglePubSubSubscriber( project=project, subscription_name=from_subscription ) deserializer = JSONDeserializerWithRequestIdRequired(schema=input_schema) super().__init__( subscriber=subscriber, handler=handler, deserializer=deserializer, from_topic=from_topic, ) class GoogleBaseReceiveAndReply(_BaseGoogleListenerWithRequestIdLogger): def __init__( self, handler: Handler, input_schema: marshmallow.Schema, from_subscription: str, output_schema: marshmallow.Schema, to_topic: str, project: str, from_topic: str = '', ): will_be_removed( 'GoogleBaseReceiveAndReply', 'ExecutorWithAck or GooglePubSubExecutorWithRequestId', '0.11.0', ) subscriber = GooglePubSubSubscriber( project=project, subscription_name=from_subscription ) deserializer = JSONDeserializerWithRequestIdRequired(schema=input_schema) serializer = BinaryJSONSerializerForSchema(schema=output_schema) publisher = GooglePubSubPublisher(project=project, to_topic=to_topic) super().__init__( handler=handler, deserializer=deserializer, subscriber=subscriber, serializer=serializer, publisher=publisher, from_topic=from_topic, ) PKN>wv66-happyly/google_pubsub/high_level/early_ack.pyfrom typing import Optional, Any from happyly._deprecations.utils import will_be_removed from .base import GoogleBaseReceiver, GoogleBaseReceiveAndReply class GoogleEarlyAckReceiver(GoogleBaseReceiver): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) will_be_removed( 'GoogleEarlyAckReceiver', 'ExecutorWithAck or GooglePubSubExecutorWithRequestId', '0.11.0', ) def _fetch_deserialized_and_result(self, message: Optional[Any]): self.ack(message) super()._fetch_deserialized_and_result(message) class GoogleEarlyAckReceiveAndReply(GoogleBaseReceiveAndReply): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) will_be_removed( 'GoogleEarlyAckReceiveAndReply', 'ExecutorWithAck or GooglePubSubExecutorWithRequestId', '0.11.0', ) def _fetch_deserialized_and_result(self, message: Optional[Any]): self.ack(message) super()._fetch_deserialized_and_result(message) PKNqMZZ,happyly/google_pubsub/high_level/late_ack.pyfrom typing import Optional, Any from happyly._deprecations.utils import will_be_removed from ..high_level.base import GoogleBaseReceiver, GoogleBaseReceiveAndReply class GoogleLateAckReceiver(GoogleBaseReceiver): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) will_be_removed( 'GoogleLateAckReceiver', 'ExecutorWithAck or GooglePubSubExecutorWithRequestId', '0.11.0', ) def on_finished(self, original_message: Any, error: Optional[Exception]): self.ack(original_message) super().on_finished(original_message, error) class GoogleLateAckReceiveAndReply(GoogleBaseReceiveAndReply): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) will_be_removed( 'GoogleLateAckReceiveAndReply', 'ExecutorWithAck or GooglePubSubExecutorWithRequestId', '0.11.0', ) def on_finished(self, original_message: Any, error: Optional[Exception]): self.ack(original_message) super().on_finished(original_message, error) PKN *happyly/google_pubsub/high_level/simple.pyfrom typing import Union, Optional import marshmallow from happyly._deprecations.utils import will_be_removed from happyly.handling.dummy_handler import DUMMY_HANDLER from ..deserializers import JSONDeserializerWithRequestIdRequired from ..publishers import GooglePubSubPublisher from happyly.serialization.json import BinaryJSONSerializerForSchema from happyly.handling import Handler from happyly.listening.executor import Executor class GoogleSimpleSender( Executor[ Union[None, JSONDeserializerWithRequestIdRequired], GooglePubSubPublisher, BinaryJSONSerializerForSchema, None, ] ): def __init__( self, output_schema: marshmallow.Schema, to_topic: str, project: str, handler: Handler = DUMMY_HANDLER, input_schema: Optional[marshmallow.Schema] = None, ): will_be_removed( deprecated_name='GoogleSimpleSender', use_instead='ExecutorWithAck or GooglePubSubExecutorWithRequestId', removing_in_version='0.11.0', ) if input_schema is None: deserializer = None else: deserializer = JSONDeserializerWithRequestIdRequired(schema=input_schema) publisher = GooglePubSubPublisher(project=project, to_topic=to_topic) serializer = BinaryJSONSerializerForSchema(schema=output_schema) super().__init__( publisher=publisher, handler=handler, deserializer=deserializer, serializer=serializer, ) PKNzͥ.happyly/google_pubsub/high_level/with_cache.pyimport marshmallow from happyly._deprecations.utils import will_be_removed from .early_ack import GoogleEarlyAckReceiveAndReply, GoogleEarlyAckReceiver from happyly.caching.cacher import Cacher from happyly.caching.mixins import CacheByRequestIdMixin from happyly.handling import Handler class GoogleCachedReceiveAndReply(CacheByRequestIdMixin, GoogleEarlyAckReceiveAndReply): def __init__( self, handler: Handler, input_schema: marshmallow.Schema, from_subscription: str, from_topic: str, output_schema: marshmallow.Schema, to_topic: str, project: str, cacher: Cacher, ): will_be_removed( 'GoogleCachedReceiveAndReply', 'ExecutorWithAck or GooglePubSubExecutorWithRequestId', '0.11.0', ) GoogleEarlyAckReceiveAndReply.__init__( self, handler=handler, input_schema=input_schema, from_subscription=from_subscription, output_schema=output_schema, to_topic=to_topic, project=project, from_topic=from_topic, ) CacheByRequestIdMixin.__init__(self, cacher) class GoogleCachedReceiver(CacheByRequestIdMixin, GoogleEarlyAckReceiver): def __init__( self, handler: Handler, input_schema: marshmallow.Schema, from_subscription: str, from_topic: str, project: str, cacher: Cacher, ): will_be_removed( 'GoogleCachedReceiver', 'ExecutorWithAck or GooglePubSubExecutorWithRequestId', '0.11.0', ) GoogleEarlyAckReceiver.__init__( self, handler=handler, input_schema=input_schema, from_subscription=from_subscription, project=project, from_topic=from_topic, ) CacheByRequestIdMixin.__init__(self, cacher) PK N.bbhappyly/handling/__init__.pyfrom .handler import Handler # noqa: F401 from .dummy_handler import DUMMY_HANDLER # noqa: F401 PK NBK!happyly/handling/dummy_handler.pyfrom typing import Mapping, Any from happyly.handling.handler import Handler class _DummyHandler(Handler): def handle(self, message: Mapping[str, Any]): return message def on_handling_failed(self, message: Mapping[str, Any], error: Exception): raise error DUMMY_HANDLER: _DummyHandler = _DummyHandler() """ Handler which just returns provided message attributes (kind of an "identity function") """ PK۴N)happyly/handling/handler.pyfrom abc import ABC, abstractmethod from typing import Mapping, Any, Optional _no_base_impl = NotImplementedError('No default implementation in base Handler class') class Handler(ABC): """ A class containing logic to handle a parsed message. """ @abstractmethod def handle(self, message: Mapping[str, Any]) -> Optional[Mapping[str, Any]]: """ Applies logic using a provided message, optionally gives back one or more results. Each result consists of message attributes which can be serialized and sent. When fails, calls :meth:`on_handling_failed` :param message: A parsed message as a dictionary of attributes :return: None if no result is extracted from handling, a dictionary of attributes for single result """ raise _no_base_impl @abstractmethod def on_handling_failed( self, message: Mapping[str, Any], error: Exception ) -> Optional[Mapping[str, Any]]: """ Applies fallback logic using a provided message when :meth:`handle` fails, optionally gives back one or more results. Enforces users of :class:`Handler` class to provide explicit strategy for errors. If you want to propagate error further to the underlying Executor/Handler, just re-raise an `error` here:: def on_handling_failed(self, message, error): raise error :param message: A parsed message as a dictionary of attributes :param error: Error raised by :meth:`handle` :return: None if no result is extracted from handling, a dictionary of attributes for single result """ raise _no_base_impl def __call__(self, message: Mapping[str, Any]) -> Optional[Mapping[str, Any]]: try: return self.handle(message) except Exception as e: return self.on_handling_failed(message, e) PK NAP!happyly/listening/__init__.py# flake8: noqa F401 from .listener import BaseListener, EarlyAckListener, LateAckListener, ListenerWithAck from .executor import Executor PKNfPMFFhappyly/listening/executor.pyimport logging import threading import queue from collections import namedtuple from types import FunctionType from typing import ( Mapping, Any, Optional, TypeVar, Generic, Tuple, Union, Callable, Iterator, ) from happyly.utils import generator_check from happyly.exceptions import StopPipeline, FetchedNoResult from happyly.handling.dummy_handler import DUMMY_HANDLER from happyly.handling import Handler from happyly.serialization.deserializer import Deserializer from happyly.serialization.serializer import Serializer from happyly.pubsub import BasePublisher from happyly.serialization import DUMMY_SERDE from happyly.pubsub import BaseSubscriber _LOGGER = logging.getLogger(__name__) D = TypeVar("D", bound=Deserializer) P = TypeVar("P", bound=BasePublisher) SE = TypeVar("SE", bound=Serializer) S = TypeVar('S', bound=BaseSubscriber) _Result = Optional[Mapping[str, Any]] ResultAndDeserialized = namedtuple('ResultAndDeserialized', 'result deserialized') HandlerClsOrFn = Union[Handler, Callable[[Mapping[str, Any]], _Result]] def _deser_converter(deserializer: Union[Deserializer, Callable]): if isinstance(deserializer, FunctionType): return Deserializer.from_function(deserializer) elif isinstance(deserializer, Deserializer): return deserializer else: raise TypeError def _publ_converter(publisher: Union[BasePublisher, Callable]): if isinstance(publisher, FunctionType): return BasePublisher.from_function(publisher) elif isinstance(publisher, BasePublisher): return publisher else: raise TypeError def _ser_converter(serializer: Union[Serializer, Callable]): if isinstance(serializer, FunctionType): return Serializer.from_function(serializer) elif isinstance(serializer, Serializer): return serializer else: raise TypeError class Executor(Generic[D, P, SE, S]): """ Component which is able to run handler as a part of more complex pipeline. Implements managing of stages inside the pipeline (deserialization, handling, serialization, publishing) and introduces callbacks between the stages which can be easily overridden. Executor does not implement stages themselves, it takes internal implementation of stages from corresponding components: :class:`Handler`, :class:`Deserializer`, :class:`Publisher`. It means that :class:`Executor` is universal and can work with any serialization/messaging technology depending on concrete components provided to executor's constructor. """ handler: HandlerClsOrFn """ Provides implementation of handling stage to Executor. """ deserializer: D # Why type:ignore? Because DUMMY_SERDE is a subclass of Deserializer # but not necessarily subclass of whatever D will be in runtime. """ Provides implementation of deserialization stage to Executor. If not present, no deserialization is performed. """ publisher: Optional[P] """ Provides implementation of serialization and publishing stages to Executor. If not present, no publishing is performed. """ serializer: SE subscriber: Optional[S] def __init__( self, handler: HandlerClsOrFn = DUMMY_HANDLER, deserializer: Optional[Union[D, Callable]] = None, publisher: Optional[Union[P, Callable]] = None, serializer: Optional[Union[SE, Callable]] = None, subscriber: Optional[S] = None, ): self.handler = handler # type: ignore if deserializer is None: self.deserializer = DUMMY_SERDE # type: ignore else: self.deserializer = _deser_converter(deserializer) if publisher is None: self.publisher = None else: self.publisher = _publ_converter(publisher) if serializer is None: self.serializer = DUMMY_SERDE # type: ignore else: self.serializer = _ser_converter(serializer) self.subscriber = subscriber self.publisher_queue: queue.Queue = queue.Queue() def on_received(self, original_message: Any): """ Callback which is called as soon as pipeline is run. Override it in your custom Executor/Listener if needed, but don't forget to call implementation from base class. :param original_message: Message as it has been received, without any deserialization """ _LOGGER.info(f"Received message: {original_message}") def on_deserialized( self, original_message: Any, deserialized_message: Mapping[str, Any] ): """ Callback which is called right after message was deserialized successfully. Override it in your custom Executor/Listener if needed, but don't forget to call implementation from base class. :param original_message: Message as it has been received, without any deserialization :param deserialized_message: Message attributes after deserialization """ _LOGGER.info( 'Message successfully deserialized into attributes: ' f'{deserialized_message}' ) def on_deserialization_failed(self, original_message: Any, error: Exception): """ Callback which is called right after deserialization failure. Override it in your custom Executor/Listener if needed, but don't forget to call implementation from base class. :param original_message: Message as it has been received, without any deserialization :param error: exception object which was raised """ _LOGGER.exception('') _LOGGER.error( f"Was not able to deserialize the following message: {original_message}" ) def on_handled( self, original_message: Any, deserialized_message: Mapping[str, Any], result: Optional[Mapping[str, Any]], ): """ Callback which is called right after message was handled (successfully or not, but without raising an exception). Override it in your custom Executor/Listener if needed, but don't forget to call implementation from base class. :param original_message: Message as it has been received, without any deserialization :param deserialized_message: Message attributes after deserialization :param result: Result fetched from handler """ _LOGGER.info(f"Message handled, result: {result}.") def on_handling_failed( self, original_message: Any, deserialized_message: Mapping[str, Any], error: Exception, ): """ Callback which is called if handler's `on_handling_failed` raises an exception. Override it in your custom Executor/Listener if needed, but don't forget to call implementation from base class. :param original_message: Message as it has been received, without any deserialization :param deserialized_message: Message attributes after deserialization :param error: exception object which was raised """ _LOGGER.exception('') _LOGGER.error(f'Handler raised an exception.') def on_serialized( self, original_message: Any, deserialized_message: Optional[Mapping[str, Any]], result: _Result, serialized_message: Any, ): _LOGGER.debug('Serialized message.') def on_serialization_failed( self, original: Any, deserialized: Optional[Mapping[str, Any]], result: _Result, error: Exception, ): _LOGGER.exception('') _LOGGER.error('Was not able to deserialize message.') def on_published( self, original_message: Any, deserialized_message: Optional[Mapping[str, Any]], result: _Result, serialized_message: Any, ): """ Callback which is called right after message was published successfully. Override it in your custom Executor/Listener if needed, but don't forget to call implementation from base class. :param original_message: Message as it has been received, without any deserialization :param deserialized_message: Message attributes after deserialization :param result: Result fetched from handler """ _LOGGER.info(f"Published result: {result}") def on_publishing_failed( self, original_message: Any, deserialized_message: Optional[Mapping[str, Any]], result: _Result, serialized_message: Any, error: Exception, ): """ Callback which is called when publisher fails to publish. Override it in your custom Executor/Listener if needed, but don't forget to call implementation from base class. :param original_message: Message as it has been received, without any deserialization :param deserialized_message: Message attributes after deserialization :param result: Result fetched from handler :param error: exception object which was raised """ _LOGGER.exception('') _LOGGER.error(f"Failed to publish result: {result}") def on_finished(self, original_message: Any, error: Optional[Exception]): """ Callback which is called when pipeline finishes its execution. Is guaranteed to be called unless pipeline is stopped via StopPipeline. :param original_message: Message as it has been received, without any deserialization :param error: exception object which was raised or None """ _LOGGER.info('Pipeline execution finished.') def on_stopped(self, original_message: Any, reason: str = ''): """ Callback which is called when pipeline is stopped via :exc:`.StopPipeline` :param original_message: Message as it has been received, without any deserialization :param reason: message describing why the pipeline stopped """ s = "." if reason == "" else f" due to the reason: {reason}." _LOGGER.info(f'Stopped pipeline{s}') def _try_publish( self # original: Any, # parsed: Optional[Mapping[str, Any]], # result: _Result, # serialized: Any, ): original, parsed, result, serialized = self.publisher_queue.get() assert self.publisher is not None try: self.publisher.publish(serialized) except Exception as e: self.on_publishing_failed( original_message=original, deserialized_message=parsed, result=result, serialized_message=serialized, error=e, ) self.publisher_queue.task_done() raise e from e else: self.on_published( original_message=original, deserialized_message=parsed, result=result, serialized_message=serialized, ) self.publisher_queue.task_done() def _fetch_deserialized_and_result( self, message: Optional[Any] ) -> Iterator[ResultAndDeserialized]: try: deserialized = self._deserialize(message) except StopPipeline as e: raise e from e except Exception as e: yield ResultAndDeserialized( result=self._build_error_result(message, e), deserialized=None ) return for result in self._handle(message, deserialized): yield ResultAndDeserialized(result=result, deserialized=deserialized) def _deserialize(self, message: Optional[Any]): try: deserialized = self.deserializer.deserialize(message) except Exception as e: self.on_deserialization_failed(original_message=message, error=e) raise e from e else: self.on_deserialized( original_message=message, deserialized_message=deserialized ) return deserialized def _build_error_result(self, message: Any, error: Exception): try: error_result = self.deserializer.build_error_result(message, error) except Exception as new_e: _LOGGER.exception('') _LOGGER.error("Deserialization failed and error result cannot be built.") raise new_e from new_e return error_result def _handle(self, message: Optional[Any], deserialized: Mapping[str, Any]): try: if generator_check.is_generator(self.handler): # type: ignore for result in self.handler(deserialized): # type: ignore self.on_handled( # type: ignore original_message=message, deserialized_message=deserialized, result=result, ) yield result else: result = self.handler(deserialized) # type: ignore self.on_handled( original_message=message, deserialized_message=deserialized, result=result, ) yield result return except Exception as e: self.on_handling_failed( original_message=message, deserialized_message=deserialized, error=e ) raise e from e def _serialize( self, original_message: Optional[Any], parsed_message: Optional[Mapping[str, Any]], result: Mapping[str, Any], ) -> Any: try: serialized = self.serializer.serialize(result) except Exception as e: self.on_serialization_failed( original=original_message, deserialized=parsed_message, result=result, error=e, ) else: self.on_serialized( original_message=original_message, deserialized_message=parsed_message, result=result, serialized_message=serialized, ) return serialized def _run_core( self, message: Optional[Any] = None ) -> Iterator[Tuple[Optional[Mapping[str, Any]], _Result, Optional[Any]]]: self.on_received(message) for result, deserialized in self._fetch_deserialized_and_result(message): if result is not None: serialized = self._serialize(message, deserialized, result) else: serialized = None yield deserialized, result, serialized def run(self, message: Optional[Any] = None): """ Method that starts execution of pipeline stages. To stop the pipeline raise StopPipeline inside any callback. :param message: Message as is, without deserialization. Or message attributes if the executor was instantiated with neither a deserializer nor a handler (useful to quickly publish message attributes by hand) """ try: publisher_thread = threading.Thread(target=self._try_publish, daemon=True) publisher_thread.start() for deserialized, result, serialized in self._run_core(message): if self.publisher is not None and serialized is not None: assert ( result is not None ) # something is serialized, so there must be a result self.publisher_queue.put( (message, deserialized, result, serialized) ) self.publisher_queue.join() except StopPipeline as e: self.on_stopped(original_message=message, reason=e.reason) except Exception as e: self.on_finished(original_message=message, error=e) else: self.on_finished(original_message=message, error=None) def run_for_result(self, message: Optional[Any] = None): try: if generator_check.is_generator(self.handler): # type: ignore def func(m): for _, _, res in self._run_core(m): yield res result = func(message) else: _, _, result = next(self._run_core(message)) except StopPipeline as e: self.on_stopped(original_message=message, reason=e.reason) raise FetchedNoResult from e except Exception as e: self.on_finished(original_message=message, error=e) raise FetchedNoResult from e else: self.on_finished(original_message=message, error=None) return result def start_listening(self): if self.subscriber is None: raise Exception('Cannot subscribe since subscriber is not initialized.') return self.subscriber.subscribe(callback=self.run) if __name__ == '__main__': def a(m): for i in range(3): yield {"a": str(i)} class StoppingExecutor(Executor): def on_deserialized( self, original_message: Any, deserialized_message: Mapping[str, Any] ): super().on_deserialized(original_message, deserialized_message) raise StopPipeline("the sky is very high") logging.basicConfig(level=logging.DEBUG) StoppingExecutor(lambda m: {'2': 42}).run() # type: ignore import time time.sleep(1) res = Executor(a).run_for_result() for r in res: print(r) PKN84 happyly/listening/listener.py""" :class:`~happyly.listening.listener.BaseListener` and its subclasses. Listener is a form of Executor which is able to run pipeline by an event coming from a subscription. """ import logging from typing import Any, TypeVar, Optional, Generic from happyly.serialization.serializer import Serializer from happyly.serialization.dummy import DUMMY_SERDE from happyly.handling import Handler from happyly.pubsub import BasePublisher from happyly.pubsub.subscriber import BaseSubscriber, SubscriberWithAck from happyly.serialization import Deserializer from .executor import Executor _LOGGER = logging.getLogger(__name__) D = TypeVar("D", bound=Deserializer) P = TypeVar("P", bound=BasePublisher) S = TypeVar("S", bound=BaseSubscriber) SE = TypeVar("SE", bound=Serializer) BaseListener = Executor class ExecutorWithAck(Executor[D, P, SE, SubscriberWithAck], Generic[D, P, SE]): """ Acknowledge-aware listener. Defines :meth:`ListenerWithAck.ack` method. Subclass :class:`ListenerWithAck` and specify when to ack by overriding the corresponding callbacks. """ def __init__( # type: ignore self, subscriber: SubscriberWithAck, handler: Handler, deserializer: D, serializer: SE = DUMMY_SERDE, publisher: Optional[P] = None, ): super().__init__( handler=handler, deserializer=deserializer, serializer=serializer, publisher=publisher, subscriber=subscriber, ) def on_acknowledged(self, message: Any): """ Callback which is called write after message was acknowledged. Override it in your custom Executor/Listener if needed, but don't forget to call implementation from base class. :param message: Message as it has been received, without any deserialization """ _LOGGER.info('Message acknowledged') def ack(self, message: Any): """ Acknowledge the message using implementation from subscriber, then log success. :param message: Message as it has been received, without any deserialization """ if self.subscriber is None: raise Exception('Cannot ack since subscriber is not initialized.') self.subscriber.ack(message) self.on_acknowledged(message) class EarlyAckExecutor(ExecutorWithAck[D, P, SE], Generic[D, P, SE]): """ Acknowledge-aware :class:`BaseListener`, which performs :meth:`.ack` right after :meth:`.on_received` callback is finished. """ def _fetch_deserialized_and_result(self, message: Optional[Any]): self.ack(message) super()._fetch_deserialized_and_result(message) class LateAckExecutor(ExecutorWithAck[D, P, SE], Generic[D, P, SE]): """ Acknowledge-aware listener, which performs :meth:`.ack` at the very end of pipeline. """ def on_finished(self, original_message: Any, error: Optional[Exception]): self.ack(original_message) super().on_finished(original_message, error) ListenerWithAck = ExecutorWithAck EarlyAckListener = EarlyAckExecutor LateAckListener = LateAckExecutor PK Nhappyly/logs/__init__.pyPK N%%happyly/logs/base.pyfrom abc import ABC, abstractmethod _not_impl = NotImplementedError('No default implementation in base logger class') class BaseLogger(ABC): @abstractmethod def info(self, message: str): raise _not_impl @abstractmethod def debug(self, message: str): raise _not_impl @abstractmethod def warning(self, message: str): raise _not_impl @abstractmethod def exception(self, message: str): raise _not_impl @abstractmethod def error(self, message: str): raise _not_impl PK N-happyly/logs/request_id.pyfrom logging import Logger from attr import attrs from .base import BaseLogger @attrs(auto_attribs=True) class RequestIdLogger(BaseLogger): logger: Logger topic: str = '' request_id: str = '' def _fmt(self, message): return f' {self.topic:>35} | {self.request_id:>40} |> {message}' def info(self, message: str): self.logger.info(self._fmt(message)) def debug(self, message: str): self.logger.debug(self._fmt(message)) def warning(self, message: str): self.logger.warning(self._fmt(message)) def exception(self, message: str): self.logger.exception(self._fmt(message)) def error(self, message: str): self.logger.error(self._fmt(message)) PK N6G{{happyly/pubsub/__init__.pyfrom .publisher import BasePublisher # noqa: F401 from .subscriber import SubscriberWithAck, BaseSubscriber # noqa: F401 PK NO}D99happyly/pubsub/publisher.pyfrom abc import ABC, abstractmethod from typing import Any, Callable class BasePublisher(ABC): @abstractmethod def publish(self, serialized_message: Any): raise NotImplementedError("No default implementation in base publisher class") @classmethod def from_function(cls, func: Callable[[Any], None]): def publish(self, serialized_message: Any): func(serialized_message) constructed_type = type( '__GeneratedPublisher', (BasePublisher,), {'publish': publish} ) return constructed_type() PK NMSShappyly/pubsub/subscriber.pyfrom abc import ABC, abstractmethod from typing import Callable, Any class BaseSubscriber(ABC): @abstractmethod def subscribe(self, callback: Callable[[Any], Any]): raise NotImplementedError class SubscriberWithAck(BaseSubscriber, ABC): @abstractmethod def ack(self, message): raise NotImplementedError PK N_))happyly/schemas/__init__.pyfrom .schema import Schema # noqa: F401 PK Nhappyly/schemas/schema.pyimport marshmallow class Schema(marshmallow.Schema): """ :doc:`Marshmallow ` schema, which raises errors on mismatch (extra fields provided also raise exception). Subclass it just like any marshmallow :class:`~marshmallow.Schema` to describe schema. Instantiation with no arguments is a good strict default, but you can pass any arguments valid for :class:`marshmallow.Schema` """ def __init__(self, *args, **kwargs): super().__init__(strict=True, *args, **kwargs) @marshmallow.validates_schema(pass_original=True) def check_unknown_fields(self, data, original_data): unknown = set(original_data) - set(self.fields) if unknown: raise marshmallow.ValidationError('Unknown field', unknown) PK N6Mt!happyly/serialization/__init__.pyfrom .deserializer import Deserializer # noqa: F401 from .serializer import Serializer # noqa: F401 from .dummy import DUMMY_DESERIALIZER, DUMMY_SERDE, DummyValidator # noqa: F401 PK NNw%happyly/serialization/deserializer.pyfrom abc import ABC, abstractmethod from typing import Mapping, Any, Callable import marshmallow from attr import attrs _not_impl = NotImplementedError('No default implementation in base Deserializer class') class Deserializer(ABC): @abstractmethod def deserialize(self, message: Any) -> Mapping[str, Any]: raise _not_impl def build_error_result(self, message: Any, error: Exception) -> Mapping[str, Any]: raise error from error @classmethod def from_function(cls, func: Callable[[Any], Mapping[str, Any]]): def deserialize(self, message: Any) -> Mapping[str, Any]: return func(message) constructed_type = type( '__GeneratedDeserializer', (Deserializer,), {'deserialize': deserialize} ) return constructed_type() @attrs(auto_attribs=True, frozen=True) class DeserializerWithSchema(Deserializer, ABC): schema: marshmallow.Schema PK N^whappyly/serialization/dummy.pyimport warnings from typing import Any, Mapping import marshmallow from attr import attrs from happyly.serialization import Serializer from .deserializer import Deserializer class DummySerde(Deserializer, Serializer): def _identity_transform(self, message): if self is DUMMY_DESERIALIZER: warnings.warn( "Please use DUMMY_SERDE instead, " "DUMMY_DESERIALIZER will be removed in happyly v0.9.0.", DeprecationWarning, stacklevel=2, ) if isinstance(message, Mapping): return message elif message is None: return {} else: raise ValueError( 'Dummy deserializer requires message attributes ' 'in form of dict-like structure as input' ) def serialize(self, message_attributes: Mapping[str, Any]) -> Any: return self._identity_transform(message_attributes) def deserialize(self, message) -> Mapping[str, Any]: return self._identity_transform(message) DUMMY_DESERIALIZER: DummySerde = DummySerde() DUMMY_SERDE: DummySerde = DummySerde() """ Serializer/deserializer which transforms message attributes to themselves """ @attrs(auto_attribs=True, frozen=True) class DummyValidator(Deserializer, Serializer): """ Serializer/deserializer which transforms message attributes to themselves along with validating against message schema. """ schema: marshmallow.Schema """ Schema which will be used to validate the provided message """ def _validate(self, message): errors = self.schema.validate(message) if errors != {}: raise marshmallow.ValidationError(str(errors)) def deserialize(self, message: Mapping[str, Any]) -> Mapping[str, Any]: self._validate(message) return message def serialize(self, message_attributes: Mapping[str, Any]) -> Mapping[str, Any]: self._validate(message_attributes) return message_attributes PK NXhappyly/serialization/flask.pyfrom typing import Mapping, Any from attr import attrs from happyly.serialization.serializer import SerializerWithSchema from happyly.serialization import DummyValidator @attrs(auto_attribs=True) class JsonifyForSchema(SerializerWithSchema): def serialize(self, message_attributes: Mapping[str, Any]) -> Any: DummyValidator(schema=self.schema).serialize(message_attributes) # raises error is msg doesn't match schema import flask return flask.jsonify(message_attributes) PK N-56happyly/serialization/json.pyimport json from typing import Any, Mapping from attr import attrs from happyly import Serializer, Deserializer from .deserializer import DeserializerWithSchema from .serializer import SerializerWithSchema class JSONSchemalessSerde(Serializer, Deserializer): """ Simple JSON serializer/deserializer which doesn't validate for any schema """ def serialize(self, message_attributes: Mapping[str, Any]) -> str: return json.dumps(message_attributes) def deserialize(self, message: str) -> Mapping[str, Any]: return json.loads(message) @attrs(auto_attribs=True) class JSONSerializerForSchema(SerializerWithSchema): def serialize(self, message_attributes: Mapping[str, Any]) -> Any: data, _ = self.schema.dumps(message_attributes) return data @attrs(auto_attribs=True) class JSONDeserializerForSchema(DeserializerWithSchema): def deserialize(self, message: Any) -> Mapping[str, Any]: deserialized, _ = self.schema.loads(message) return deserialized @attrs(auto_attribs=True) class BinaryJSONSerializerForSchema(SerializerWithSchema): def serialize(self, message_attributes: Mapping[str, Any]) -> Any: data, _ = self.schema.dumps(message_attributes) return data.encode('utf-8') @attrs(auto_attribs=True) class BinaryJSONDeserialierForSchema(DeserializerWithSchema): def deserialize(self, message: Any) -> Mapping[str, Any]: data = message.data.decode('utf-8') deserialized, _ = self.schema.loads(data) return deserialized PK NZ.#happyly/serialization/serializer.pyfrom abc import ABC, abstractmethod from typing import Mapping, Any, Callable import marshmallow from attr import attrs _no_default = NotImplementedError('No default implementation in base Serializer class') class Serializer(ABC): """ Abstract base class for Serializer. Provides :meth:`serialize` method which should be implemented by subclasses. """ @abstractmethod def serialize(self, message_attributes: Mapping[str, Any]) -> Any: raise _no_default @classmethod def from_function(cls, func: Callable[[Mapping[str, Any]], Any]): def serialize(self, message: Any) -> Mapping[str, Any]: return func(message) constructed_type = type( '__GeneratedSerializer', (Serializer,), {'serialize': serialize} ) return constructed_type() @attrs(auto_attribs=True, frozen=True) class SerializerWithSchema(Serializer, ABC): schema: marshmallow.Schema PKNM happyly/utils/generator_check.pyimport inspect def is_generator(handler): if hasattr(handler, 'handle'): # class-based handler return inspect.isgeneratorfunction(handler.handle) else: # func-based handler return inspect.isgeneratorfunction(handler) PK NN''happyly-0.9.0.dist-info/LICENSEMIT License Copyright (c) 2019 Equeum Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. PK!HPOhappyly-0.9.0.dist-info/WHEEL HM K-*ϳR03rOK-J,/RH,szd&Y)r$[)T&UrPK!HDT 7 ? happyly-0.9.0.dist-info/METADATAXo8_v’6mn^\do%6IԑߛdKqp̣>(/3eoe6D< Y˪Z`j?%ꢐv5'\R+T%JLmER;]*Dnf:Sk QYt9iB)+7g$JM1VU]fܹqSU:8ScEj25''Ixu,}~ huBLύ\aQYq[YE*TD\[y?W\Zά+-U.\a7"I,x\juLxWw~ffY'tNOKoB* +8x DHc2q\gzHݏ dMj`:jc&s|VoUnsrWg⸪Y&KkfV\_|yd{^{/ExƫitB V#=U!Shat؏M,sU(r0zd ^xłDϾmuDܒ@Wdgr 7uVuꄜ|Cx>[62"cgp+GVrlFU+_kojgmGlùڅ~pĐhe:?*}V9@Ė> Lu{.=׬tYráлwdgv_%d+5  38F-]ipOw!htaZ@u)2Fr~,:T"Qv]A "$VcO348Pn{BIMLy+}9SQٖT nFgl0#O9d` aLڠg񪽆_P5abSmGyc WW+Sg'*]`+j&7x }Loݟt?`H֨;*/w;];_!zø}L)3qy:=~9N^5.i gF En.x:&7x7Ap=W"׉XbѐVToP;b ahGAXhpT;)\XH:R9z.\OW0]:I_*ܠ[0"wV"Hyo8^uAgYƖ_Λ'R}qoJb)jKF|DCH~ߵ-nMGHpN9_ rQ%~뮊f!"\ Б@{r)qԥ@\x0=~ۉD7GH袶ܽT(5.i1LUt(S`c(6C#襎Du4IPeӄHF0]cߪƮWNlHnoX׸js6 W!"ɢ?i*C&)%  ںdFP؈qcIXW'Yľt!A:*(cmφC`zPNhvX U&$y}ݣU?:O[ބ47]>: nO#>U!?^/rQn.~I!:kvs!D\g. Y<'\DrG-Ć(*cN*0D fQ9%?g4f2]48QahhUmeŘCtKSMt5]A&4[8)]D8֗h[F_5b IJbv0e<Cʸm! ַ'A,DCq? ; `jEPʩِUڀ tYr azOQRD `@Yhdş_ԭn}-Ez̫D9sPfL{9NV t|<* ZWH78YytSyTJBNwKa|o8qBAaAtKhDgSnJ za! [}^-R˔AA-"5m Úv 0Ywj Bb $`^5m{=|7B(ҽ< 'b s?*hykߴAK3;'BN?lk~_5y>'[& iTNu.`8J)O&[i#q)nz,ٷ6u+6}L5G7zZD7ءK o:MK`S7Hy̵l5WmVe@у+&]]SZ}ԇpߑߟQbvRy24;` 7љJO߀:7yݰ6e& 'l% @18z㐠];p #c.~q5[ ! 8^Q! 8&!U Kg=fw)ZZ40 a|fTe.ʙ-);ev4QT# AQ;lIQGN/]i0^0u0]Oܹn H  R|9bDom'nQ^ݼ';=>՞k&3R"|ncV5|V|JERoJbzT+0vOP n٪J-t.2^4NN{1aw@kTUrQ\iޙV =w] 1spn'O߱XK",|=h')swwHI_%&'?vU&-jv*SX7ۧy=6 >HwK̖(xeGxKz"oq/+ 5|{y}AkC X??ݱ'W9M %UB;݂YlfhNj+KI] b_K>* @=)#k91#\\0c9;84`uOAv{'}n#ƙ1@_Ntlu0?@#b o&~t{S.6e(Y/^OtBp %y=Agv >xïI$~ĿUY׬l΋nش% v"gu31_i~^'\Img:-#+P:5ĉapW ;<2h/ ?TP%d 0b`*~>r[eB<4* q "vP&vUVtdPløW fWvzQiB9NNE~9];&*](NxT>Jc4!n01x3.uKE׮}2p\.IV@/R)8 i;.NƠyj~c^Q)Kp6-@m-[CKVS#6H͢]Z S,Z# USIp^z68 WvǾb_X4R4,"Wٍ }Ɋ';IjM8Bb( E!PKrN*tllhappyly/__init__.pyPK Nh happyly/exceptions.pyPK N!happyly/_deprecations/__init__.pyPKNl((happyly/_deprecations/utils.pyPKNݓAffHhappyly/caching/__init__.pyPK N6yyhappyly/caching/cacher.pyPKN55 happyly/caching/mixins.pyPKNt=M!happyly/google_pubsub/__init__.pyPK Ndj&happyly/google_pubsub/deserializers.pyPKN۪[#happyly/google_pubsub/publishers.pyPKN9 n%#happyly/google_pubsub/redis_cacher.pyPKND $ )happyly/google_pubsub/subscribers.pyPKN@__,c-happyly/google_pubsub/high_level/__init__.pyPKN( /happyly/google_pubsub/high_level/base.pyPKN>wv66-DNhappyly/google_pubsub/high_level/early_ack.pyPKNqMZZ,Rhappyly/google_pubsub/high_level/late_ack.pyPKN *iWhappyly/google_pubsub/high_level/simple.pyPKNzͥ.]happyly/google_pubsub/high_level/with_cache.pyPK N.bbehappyly/handling/__init__.pyPK NBK!Pfhappyly/handling/dummy_handler.pyPK۴N)