PK8yNU}happyly/__init__.py"""Python library for Pub/Sub message handling.""" # flake8: noqa F401 __version__ = '0.4.1' from .listening import Executor, Listener, BaseListener from .schemas import Schema from .caching import Cacher from .serialization import Serializer, Deserializer from .handling import Handler, DUMMY_HANDLER def _welcome(): import sys sys.stdout.write(f'Using happyly v{__version__}.\n') def _setup_warnings(): import warnings for warning_type in PendingDeprecationWarning, DeprecationWarning: warnings.filterwarnings( 'always', category=warning_type, module=r'^{0}\.'.format(__name__) ) _welcome() _setup_warnings() del _welcome del _setup_warnings PKxyNv))happyly/caching/__init__.pyfrom .cacher import Cacher # noqa: F401 PKxyNooohappyly/caching/cacher.pyfrom abc import ABC, abstractmethod from typing import Any _no_default_impl = NotImplementedError('No default implementation for class Cacher') class Cacher(ABC): """ Abstract base class which defines interface of any caching component to be used via CacheByRequestIdMixin or similar mixin. """ @abstractmethod def add(self, data: Any, key: str): """ Add the provided data to cache and store it by the provided key. """ raise _no_default_impl @abstractmethod def remove(self, key: str): """ Remove data from cache which is stored by the provided key. """ raise _no_default_impl @abstractmethod def get(self, key: str): """ Returns data which is stored in cache by the provided key. """ raise _no_default_impl PKxyNW^happyly/caching/mixins.pyimport json from typing import Any, Mapping, Optional from happyly.caching.cacher import Cacher from happyly.handling import HandlingResult class CacheByRequestIdMixin: """ Mixin which adds caching functionality to Listener. Utilizes notions of listener's topic and request id of message - otherwise will not work. To be used via multiple inheritance. For example, given some component SomeListener you can define its caching equivalent by defining SomeCachedListener which inherits from both SomeListener and CacheByRequestIdMixin. """ def __init__(self, cacher: Cacher): self.cacher = cacher def on_received(self, message: Any): super().on_received(message) try: req_id = self._get_req_id(message) except Exception: pass else: data = json.dumps( {'topic': self.from_topic, 'data': json.loads(message.data)} ) self.cacher.add(data, key=req_id) def _get_req_id(self, message: Any) -> str: assert self.deserializer is not None attribtues = self.deserializer.deserialize(message) return attribtues[self.deserializer.request_id_field] def _rm(self, parsed_message: Mapping[str, Any]): assert self.deserializer is not None self.cacher.remove(parsed_message[self.deserializer.request_id_field]) def on_published( self, original_message: Any, parsed_message: Optional[Mapping[str, Any]], result: HandlingResult, ): super().on_published(original_message, parsed_message, result) if parsed_message is not None: self._rm(parsed_message) def on_deserialization_failed(self, message: Any, error: Exception): super().on_deserialization_failed(message, error) try: req_id = self._get_req_id(message) except Exception: pass else: self.cacher.remove(key=req_id) PKxyNrWu!happyly/google_pubsub/__init__.py# flake8: noqa F401 from .high_level import ( GoogleSimpleSender, GoogleSimpleReceiver, GoogleReceiveAndReplyComponent, GoogleSimpleReceiveAndReply, GoogleCachedReceiveAndReply, GoogleCachedReceiver, GoogleLateAckReceiver, GoogleLateAckReceiveAndReply, ) from .redis_cacher import RedisCacher from .deserializers import JSONDeserializerWithRequestIdRequired from .serializers import BinaryJSONSerializer from .publishers import GooglePubSubPublisher from .subscribers import GooglePubSubSubscriber PKyNά&happyly/google_pubsub/deserializers.pyfrom typing import Mapping, Any import json from attr import attrs import marshmallow from happyly.serialization import Deserializer @attrs(auto_attribs=True, frozen=True) class JSONDeserializerWithRequestIdRequired(Deserializer): """ Deserializer for Google Pub/Sub messages which expects a message of certain schema to be written in `message.data` as JSON encoded into binary data with utf-8. Schema used with this serializer must define some field which is used as request id (you can specify which one in constructor). If `JSONDeserializerWithRequestIdRequired` fails to deserialize some message, it tries to fetch request id and provide error message. """ schema: marshmallow.Schema request_id_field: str = 'request_id' status_field: str = 'status' error_field: str = 'error' _status_error: str = 'ERROR' def deserialize(self, message: Any) -> Mapping[str, Any]: """ Loads message attributes from `message.data`, expects it to be a JSON which corresponds `self.schema` encoded with utf-8. """ data = message.data.decode('utf-8') deserialized, _ = self.schema.loads(data) return deserialized def build_error_result(self, message: Any, error: Exception) -> Mapping[str, Any]: """ Provides a fallback result when `deserialize` fails. Returns a dict with attributes: * * * Field names can be specified in constructor. If request id cannot be fetched, it is set to an empty string. """ attributes = json.loads(message.data) try: return { self.request_id_field: attributes[self.request_id_field], self.status_field: self._status_error, self.error_field: repr(error), } except KeyError as e: return { self.request_id_field: '', self.status_field: self._status_error, self.error_field: f'{repr(e)}: ' f'Message contains no {self.request_id_field}', } PKcqN#happyly/google_pubsub/publishers.pyfrom typing import Any from google.cloud import pubsub_v1 from happyly.pubsub import Publisher class GooglePubSubPublisher(Publisher): def __init__(self, project, *args, **kwargs): super().__init__(*args, **kwargs) self.project = project self._publisher_client = pubsub_v1.PublisherClient() def __attrs_post_init__(self): self._publisher_client = pubsub_v1.PublisherClient() def publish_message(self, serialized_message: Any, to: str): future = self._publisher_client.publish( f'projects/{self.project}/topics/{to}', serialized_message ) try: future.result() return except Exception as e: raise e PKO{yN'o,NN%happyly/google_pubsub/redis_cacher.pyimport logging import redis from happyly.caching.cacher import Cacher _LOGGER = logging.getLogger(__name__) class RedisCacher(Cacher): def __init__(self, host: str, port: int, prefix: str = ''): self.prefix = prefix self.client = redis.StrictRedis(host=host, port=port) _LOGGER.info( f'Cache was successfully initialized with Redis client ({host}:{port})' ) if self.prefix != '': _LOGGER.info(f'Using prefix {self.prefix}') def add(self, data: str, key: str): full_key = f'{self.prefix}{key}' self.client.set(full_key, data) _LOGGER.info(f'Cached message with id {key}') def remove(self, key: str): full_key = f'{self.prefix}{key}' self.client.delete(full_key) _LOGGER.info(f'Message with id {key} was removed from cache') def get(self, key: str): full_key = f'{self.prefix}{key}' self.client.get(full_key) def get_all(self): keys: str = self.client.keys() return [self.client.get(k) for k in keys if k.startswith(self.prefix)] PKaN%v$happyly/google_pubsub/serializers.pyfrom typing import Mapping, Any import marshmallow from attr import attrs from happyly.serialization.serializer import Serializer @attrs(auto_attribs=True, frozen=True) class BinaryJSONSerializer(Serializer): schema: marshmallow.Schema def serialize(self, message_attributes: Mapping[str, Any]) -> Any: data, _ = self.schema.dumps(message_attributes) return data.encode('utf-8') PKxyNVX$happyly/google_pubsub/subscribers.pyimport logging from typing import Callable, Any from attr import attrs, attrib from google.cloud import pubsub_v1 from happyly.pubsub import SubscriberWithAck _LOGGER = logging.getLogger(__name__) @attrs(auto_attribs=True) class GooglePubSubSubscriber(SubscriberWithAck): project: str subscription_name: str _subscription_client: pubsub_v1.SubscriberClient = attrib(init=False) _subscription_path: str = attrib(init=False) def __attrs_post_init__(self): s = pubsub_v1.SubscriberClient() self._subscription_path = s.subscription_path( self.project, self.subscription_name ) self._subscription_client = s def subscribe(self, callback: Callable[[Any], Any]): _LOGGER.info(f'Starting to listen to {self.subscription_name}') return self._subscription_client.subscribe(self._subscription_path, callback) def ack(self, message): message.ack() PKxyNOO,happyly/google_pubsub/high_level/__init__.pyfrom .simple import ( # noqa: F401 GoogleSimpleSender, GoogleSimpleReceiver, GoogleSimpleReceiveAndReply, GoogleReceiveAndReplyComponent, ) from .with_cache import GoogleCachedReceiveAndReply, GoogleCachedReceiver # noqa: F401 from .late_ack import GoogleLateAckReceiver, GoogleLateAckReceiveAndReply # noqa: F401 PK{yN-6,happyly/google_pubsub/high_level/late_ack.pyimport marshmallow from happyly import Handler from ..subscribers import GooglePubSubSubscriber from ..publishers import GooglePubSubPublisher from ..serializers import BinaryJSONSerializer from ..deserializers import JSONDeserializerWithRequestIdRequired from happyly.listening.listener import LateAckListener class GoogleLateAckReceiver( LateAckListener[JSONDeserializerWithRequestIdRequired, None] ): def __init__( self, input_schema: marshmallow.Schema, from_subscription: str, project: str, handler: Handler, from_topic: str = '', ): self.from_topic = from_topic subscriber = GooglePubSubSubscriber( project=project, subscription_name=from_subscription ) deserializer = JSONDeserializerWithRequestIdRequired(schema=input_schema) super().__init__( subscriber=subscriber, handler=handler, deserializer=deserializer ) class GoogleLateAckReceiveAndReply( LateAckListener[JSONDeserializerWithRequestIdRequired, GooglePubSubPublisher] ): def __init__( self, handler: Handler, input_schema: marshmallow.Schema, from_subscription: str, output_schema: marshmallow.Schema, to_topic: str, project: str, from_topic: str = '', ): self.from_topic = from_topic subscriber = GooglePubSubSubscriber( project=project, subscription_name=from_subscription ) deserializer = JSONDeserializerWithRequestIdRequired(schema=input_schema) publisher = GooglePubSubPublisher( project=project, publish_all_to=to_topic, serializer=BinaryJSONSerializer(schema=output_schema), ) super().__init__( handler=handler, deserializer=deserializer, subscriber=subscriber, publisher=publisher, ) PK{yN冠 *happyly/google_pubsub/high_level/simple.pyfrom typing import Union, Optional import marshmallow from happyly.handling.dummy_handler import DUMMY_HANDLER from ..deserializers import JSONDeserializerWithRequestIdRequired from ..publishers import GooglePubSubPublisher from ..serializers import BinaryJSONSerializer from ..subscribers import GooglePubSubSubscriber from happyly.handling import Handler from happyly.listening.executor import Executor from happyly.listening.listener import EarlyAckListener class GoogleSimpleSender( Executor[Union[None, JSONDeserializerWithRequestIdRequired], GooglePubSubPublisher] ): def __init__( self, output_schema: marshmallow.Schema, to_topic: str, project: str, handler: Handler = DUMMY_HANDLER, input_schema: Optional[marshmallow.Schema] = None, ): if input_schema is None: deserializer = None else: deserializer = JSONDeserializerWithRequestIdRequired(schema=input_schema) publisher = GooglePubSubPublisher( project=project, publish_all_to=to_topic, serializer=BinaryJSONSerializer(schema=output_schema), ) super().__init__( publisher=publisher, handler=handler, deserializer=deserializer ) class GoogleSimpleReceiver( EarlyAckListener[JSONDeserializerWithRequestIdRequired, None] ): def __init__( self, input_schema: marshmallow.Schema, from_subscription: str, project: str, handler: Handler, from_topic: str = '', ): self.from_topic = from_topic subscriber = GooglePubSubSubscriber( project=project, subscription_name=from_subscription ) deserializer = JSONDeserializerWithRequestIdRequired(schema=input_schema) super().__init__( subscriber=subscriber, handler=handler, deserializer=deserializer ) class GoogleSimpleReceiveAndReply( EarlyAckListener[JSONDeserializerWithRequestIdRequired, GooglePubSubPublisher] ): def __init__( self, handler: Handler, input_schema: marshmallow.Schema, from_subscription: str, output_schema: marshmallow.Schema, to_topic: str, project: str, from_topic: str = '', ): self.from_topic = from_topic subscriber = GooglePubSubSubscriber( project=project, subscription_name=from_subscription ) deserializer = JSONDeserializerWithRequestIdRequired(schema=input_schema) publisher = GooglePubSubPublisher( project=project, publish_all_to=to_topic, serializer=BinaryJSONSerializer(schema=output_schema), ) super().__init__( handler=handler, deserializer=deserializer, subscriber=subscriber, publisher=publisher, ) # for compatibility GoogleReceiveAndReplyComponent = GoogleSimpleReceiveAndReply PK{yNh,@MM.happyly/google_pubsub/high_level/with_cache.pyimport marshmallow from happyly.caching.cacher import Cacher from happyly.caching.mixins import CacheByRequestIdMixin from happyly.handling import Handler from .simple import GoogleSimpleReceiveAndReply, GoogleSimpleReceiver class GoogleCachedReceiveAndReply(CacheByRequestIdMixin, GoogleSimpleReceiveAndReply): def __init__( self, handler: Handler, input_schema: marshmallow.Schema, from_subscription: str, from_topic: str, output_schema: marshmallow.Schema, to_topic: str, project: str, cacher: Cacher, ): GoogleSimpleReceiveAndReply.__init__( self, handler, input_schema, from_subscription, output_schema, to_topic, project, from_topic, ) CacheByRequestIdMixin.__init__(self, cacher) class GoogleCachedReceiver(CacheByRequestIdMixin, GoogleSimpleReceiver): def __init__( self, handler: Handler, input_schema: marshmallow.Schema, from_subscription: str, from_topic: str, project: str, cacher: Cacher, ): GoogleSimpleReceiver.__init__( self, input_schema, from_subscription, project, handler, from_topic ) CacheByRequestIdMixin.__init__(self, cacher) PKxyN.ȼ*happyly/handling/__init__.pyfrom .handler import Handler # noqa: F401 from .handling_result import HandlingResult, HandlingResultStatus # noqa: F401 from .dummy_handler import DUMMY_HANDLER # noqa: F401 PKxyNBK!happyly/handling/dummy_handler.pyfrom typing import Mapping, Any from happyly.handling.handler import Handler class _DummyHandler(Handler): def handle(self, message: Mapping[str, Any]): return message def on_handling_failed(self, message: Mapping[str, Any], error: Exception): raise error DUMMY_HANDLER: _DummyHandler = _DummyHandler() """ Handler which just returns provided message attributes (kind of an "identity function") """ PK+yN{g^^happyly/handling/handler.pyfrom abc import ABC, abstractmethod from typing import Mapping, Any from .types import ZeroToManyParsedMessages from .handling_result import HandlingResult _no_base_impl = NotImplementedError('No default implementation in base Handler class') class Handler(ABC): """ A class containing logic to handle a parsed message. """ @abstractmethod def handle(self, message: Mapping[str, Any]) -> ZeroToManyParsedMessages: """ Applies logic using a provided message, optionally gives back one or more results. Each result consists of message attributes which can be serialized and sent. When fails, calls `on_handling_failed` :param message: A parsed message as a dictionary of attributes :return: None if no result is extracted from handling, a dictionary of attributes for single result or a list of dictionaries if handling provides multiple results """ raise _no_base_impl @abstractmethod def on_handling_failed( self, message: Mapping[str, Any], error: Exception ) -> ZeroToManyParsedMessages: """ Applies fallback logic using a provided message when `handle` fails, optionally gives back one or more results. Enforces users of `Handler` class to provide explicit strategy for errors. If you want to propagate error further to the underlying Executor/Handler, just raise an exception here. :param message: A parsed message as a dictionary of attributes :param error: Error raised by `handle` :return: None if no result is extracted from handling, a dictionary of attributes for single result or a list of dictionaries if handling provides multiple results """ raise _no_base_impl def __call__(self, message: Mapping[str, Any]) -> HandlingResult: try: result_data = self.handle(message) return HandlingResult.ok(result_data) except Exception as e: result_data = self.on_handling_failed(message, e) return HandlingResult.err(result_data) PKcqN#happyly/handling/handling_result.pyfrom enum import Enum from attr import attrs from .types import ZeroToManyParsedMessages class HandlingResultStatus(Enum): OK = 'OK' ERR = 'ERR' @attrs(auto_attribs=True, frozen=True) class HandlingResult: status: HandlingResultStatus data: ZeroToManyParsedMessages @classmethod def ok(cls, data): return HandlingResult(status=HandlingResultStatus.OK, data=data) @classmethod def err(cls, data): return HandlingResult(status=HandlingResultStatus.ERR, data=data) PK~aNfhappyly/handling/types.pyfrom typing import Mapping, Any, Union, List _ParsedMessage = Mapping[str, Any] ZeroToManyParsedMessages = Union[_ParsedMessage, List[_ParsedMessage], None] PKxyNFjhappyly/listening/__init__.py# flake8: noqa F401 from .listener import Listener, BaseListener, EarlyAckListener, LateAckListener from .executor import Executor PKMyN n##happyly/listening/executor.pyimport logging from typing import Mapping, Any, Optional, TypeVar, Generic from attr import attrs from happyly.handling.dummy_handler import DUMMY_HANDLER from happyly.handling import Handler, HandlingResult from happyly.serialization.deserializer import Deserializer from happyly.pubsub import Publisher _LOGGER = logging.getLogger(__name__) D = TypeVar("D", bound=Deserializer) P = TypeVar("P", bound=Publisher) @attrs(auto_attribs=True) class Executor(Generic[D, P]): """ Component which is able to run handler as a part of more complex pipeline. Implements managing of stages inside the pipeline (deserialization, handling, serialization, publishing) and introduces callbacks between the stages which can be easily overridden. Executor does not implement stages themselves, it takes internal implementation of stages from corresponding components: handler, deserializer, publisher. It means that executor is universal and can work with any serialization/messaging technology depending on concrete components provided to executor's constructor. """ handler: Handler = DUMMY_HANDLER """ Provides implementation of handling stage to Executor. """ deserializer: Optional[D] = None """ Provides implementation of deserialization stage to Executor. If not present, no deserialization is performed. """ publisher: Optional[P] = None """ Provides implementation of serialization and publishing stages to Executor. If not present, no publishing is performed. """ def on_received(self, message: Any): """ Callback which is called as soon as pipeline is run. Override it in your custom Executor/Listener if needed, but don't forget to call implementation from base class. :param message: Message as it has been received, without any deserialization """ _LOGGER.info(f"Received message:\n {message}") def on_deserialized(self, original_message: Any, parsed_message: Mapping[str, Any]): """ Callback which is called right after message was deserialized successfully. Override it in your custom Executor/Listener if needed, but don't forget to call implementation from base class. :param original_message: Message as it has been received, without any deserialization :param parsed_message: Message attributes after deserialization """ _LOGGER.info( f"Message successfully deserialized into attributes:\n {parsed_message}" ) def on_deserialization_failed(self, message: Any, error: Exception): """ Callback which is called right after deserialization failure. Override it in your custom Executor/Listener if needed, but don't forget to call implementation from base class. :param message: Message as it has been received, without any deserialization :param error: exception object which was raised """ _LOGGER.exception( f"Was not able to deserialize the following message:\n{message}" ) def on_handled( self, original_message: Any, parsed_message: Mapping[str, Any], result: HandlingResult, ): """ Callback which is called right after message was handled (successfully or not, but without raising an exception). Override it in your custom Executor/Listener if needed, but don't forget to call implementation from base class. :param original_message: Message as it has been received, without any deserialization :param parsed_message: Message attributes after deserialization :param result: Result fetched from handler (also shows if handling was successful) """ _LOGGER.info(f"Message handled, status {result.status}") def on_handling_failed( self, original_message: Any, parsed_message: Mapping[str, Any], error: Exception ): """ Callback which is called if handler's `on_handling_failed` raises an exception. Override it in your custom Executor/Listener if needed, but don't forget to call implementation from base class. :param original_message: Message as it has been received, without any deserialization :param parsed_message: Message attributes after deserialization :param error: exception object which was raised """ _LOGGER.exception(f'Handler raised an exception.') def on_published( self, original_message: Any, parsed_message: Optional[Mapping[str, Any]], result: HandlingResult, ): """ Callback which is called right after message was published successfully. Override it in your custom Executor/Listener if needed, but don't forget to call implementation from base class. :param original_message: Message as it has been received, without any deserialization :param parsed_message: Message attributes after deserialization :param result: Result fetched from handler (also shows if handling was successful) """ _LOGGER.info(f"Published result:\n{result}") def on_publishing_failed( self, original_message: Any, parsed_message: Optional[Mapping[str, Any]], result: HandlingResult, error: Exception, ): """ Callback which is called when publisher fails to publish. Override it in your custom Executor/Listener if needed, but don't forget to call implementation from base class. :param original_message: Message as it has been received, without any deserialization :param parsed_message: Message attributes after deserialization :param result: Result fetched from handler (also shows if handling was successful) :param error: exception object which was raised """ _LOGGER.exception(f"Failed to publish result:\n{result}") def _when_parsing_succeeded(self, original: Any, parsed: Mapping[str, Any]): try: result = self.handler(parsed) self.on_handled( original_message=original, parsed_message=parsed, result=result ) except Exception as e: self.on_handling_failed(original, parsed, e) return if self.publisher is not None: self._try_publish(original, parsed, result) def _when_parsing_failed(self, message: Any, error: Exception): if self.publisher is None: return assert self.deserializer is not None try: result = self.deserializer.build_error_result(message, error) handling_result = HandlingResult.err(result) except Exception: _LOGGER.exception( "Deserialization failed and error result cannot be built." ) else: self._try_publish(original=message, parsed=None, result=handling_result) def _try_publish( self, original: Any, parsed: Optional[Mapping[str, Any]], result: HandlingResult ): assert self.publisher is not None try: self.publisher.publish_result(result) self.on_published( original_message=original, parsed_message=parsed, result=result ) except Exception as e: self.on_publishing_failed( original_message=original, parsed_message=parsed, result=result, error=e ) def _run_no_deser(self, message: Optional[Any]): if message is not None: if self.handler is DUMMY_HANDLER: self._when_parsing_succeeded(original=message, parsed=message) else: raise ValueError("No deserializer to parse non-empty message.") if message is None: self._when_parsing_succeeded(original=None, parsed={}) def _after_on_received(self, message: Optional[Any]): try: assert self.deserializer is not None parsed = self.deserializer.deserialize(message) except Exception as e: self.on_deserialization_failed(message, error=e) self._when_parsing_failed(message, error=e) else: self.on_deserialized(message, parsed) self._when_parsing_succeeded(original=message, parsed=parsed) def _run_with_deser(self, message: Optional[Any]): self.on_received(message) self._after_on_received(message) def run(self, message: Optional[Any] = None): """ Method that starts execution of pipeline stages. :param message: Message as is, without deserialization. Or message attributes if the executor was instantiated with neither a deserializer nor a handler (useful to quickly publish message attributes by hand) """ if self.deserializer is None: self._run_no_deser(message) else: self._run_with_deser(message) PKU}yNs7xBhappyly/listening/listener.pyimport logging import warnings from typing import Any, TypeVar, Optional, Generic from happyly.handling import Handler from happyly.handling.dummy_handler import DUMMY_HANDLER from happyly.pubsub import Publisher from happyly.pubsub.subscriber import BaseSubscriber, SubscriberWithAck from happyly.serialization import Deserializer from .executor import Executor _LOGGER = logging.getLogger(__name__) D = TypeVar("D", bound=Deserializer) P = TypeVar("P", bound=Publisher) S = TypeVar("S", bound=BaseSubscriber) class BaseListener(Executor[D, P], Generic[D, P, S]): """ Listener is a form of Executor which is able to run pipeline by an event coming from a subscription. Listener itself doesn't know how to subscribe, it subscribes via a provided subscriber. As any executor, implements managing of stages inside the pipeline (deserialization, handling, serialization, publishing) and contains callbacks between the stages which can be easily overridden. As any executor, listener does not implement stages themselves, it takes internal implementation of stages from corresponding components: handler, deserializer, publisher. It means that listener is universal and can work with any serialization/messaging technology depending on concrete components provided to listener's constructor. """ def __init__( self, subscriber: S, handler: Handler, deserializer: Optional[D] = None, publisher: Optional[P] = None, ): assert handler is not DUMMY_HANDLER super().__init__( handler=handler, deserializer=deserializer, publisher=publisher ) self.subscriber: S = subscriber """ Provides implementation of how to subscribe. """ def start_listening(self): return self.subscriber.subscribe(callback=self.run) class EarlyAckListener(BaseListener[D, P, SubscriberWithAck], Generic[D, P]): """ Acknowledge-aware listener, which performs `ack` right after `on_received` callback is finished. """ def __init__( self, subscriber: SubscriberWithAck, handler: Handler, deserializer: Optional[D] = None, publisher: Optional[P] = None, ): super().__init__( handler=handler, deserializer=deserializer, publisher=publisher, subscriber=subscriber, ) def on_acknowledged(self, message: Any): """ Callback which is called write after message was acknowledged. Override it in your custom Executor/Listener if needed, but don't forget to call implementation from base class. :param message: Message as it has been received, without any deserialization """ _LOGGER.info('Message acknowledged') def _after_on_received(self, message: Optional[Any]): self.subscriber.ack(message) self.on_acknowledged(message) super()._after_on_received(message) class LateAckListener(BaseListener[D, P, SubscriberWithAck], Generic[D, P]): """ Acknowledge-aware listener, which performs `ack` at the very end of pipeline. """ def __init__( self, subscriber: SubscriberWithAck, handler: Handler, deserializer: Optional[D] = None, publisher: Optional[P] = None, ): super().__init__( handler=handler, deserializer=deserializer, publisher=publisher, subscriber=subscriber, ) def on_acknowledged(self, message: Any): """ Callback which is called write after message was acknowledged. Override it in your custom Executor/Listener if needed, but don't forget to call implementation from base class. :param message: Message as it has been received, without any deserialization """ _LOGGER.info('Message acknowledged') def _after_on_received(self, message: Optional[Any]): super()._after_on_received(message) self.subscriber.ack(message) self.on_acknowledged(message) # for compatibility, to be deprecated class Listener(EarlyAckListener[D, P], Generic[D, P]): def __init__(self, *args, **kwargs): warnings.warn( "Please use EarlyAckListener instead, " "Listener will be deprecated in the future.", PendingDeprecationWarning, ) super().__init__(*args, **kwargs) PKxyN<happyly/pubsub/__init__.pyfrom .publisher import Publisher # noqa: F401 from .subscriber import Subscriber, SubscriberWithAck, BaseSubscriber # noqa: F401 PKcqNO[  happyly/pubsub/publisher.pyfrom abc import ABC, abstractmethod from typing import Optional, Mapping, Any, List from happyly.handling import HandlingResult, HandlingResultStatus from happyly.serialization.serializer import Serializer class Publisher(ABC): def __init__( self, serializer: Serializer, publish_all_to: Optional[str] = None, publish_success_to: Optional[str] = None, publish_failure_to: Optional[str] = None, ): self._serializer = serializer if publish_all_to is not None and all( p is None for p in [publish_success_to, publish_failure_to] ): self.publish_success_to: str = publish_all_to self.publish_failure_to: str = publish_all_to elif ( publish_success_to is not None and publish_failure_to is not None ) and publish_all_to is None: self.publish_success_to: str = publish_success_to self.publish_failure_to: str = publish_failure_to else: raise ValueError( """Provide "publish_all_to" only, or else provide both "publish_success_to" and "publish_failure_to""" ) @abstractmethod def publish_message(self, serialized_message: Any, to: str): raise NotImplementedError("No default implementation in base Publisher class") def _get_destination(self, status: HandlingResultStatus): if status == HandlingResultStatus.OK: return self.publish_success_to elif status == HandlingResultStatus.ERR: return self.publish_failure_to else: raise ValueError(f"Unknown status {status}") def _publish_serialized(self, data: Mapping[str, Any], to: str): serialized = self._serializer.serialize(data) self.publish_message(serialized, to) def publish_result(self, result: HandlingResult): data = result.data if data is None: return destination = self._get_destination(result.status) if isinstance(data, Mapping): self._publish_serialized(data, to=destination) elif isinstance(data, List): for item in data: self._publish_serialized(item, to=destination) else: raise ValueError("Invalid data structure") PKxyNOdMohappyly/pubsub/subscriber.pyimport warnings from abc import ABC, abstractmethod from typing import Callable, Any class BaseSubscriber(ABC): @abstractmethod def subscribe(self, callback: Callable[[Any], Any]): raise NotImplementedError class SubscriberWithAck(BaseSubscriber, ABC): @abstractmethod def ack(self, message): raise NotImplementedError # for compatibility, to be deprecated class Subscriber(SubscriberWithAck, ABC): def __init__(self, *args, **kwargs): warnings.warn( "Please use SubscriberWithAck instead, " "Subscriber will be deprecated in the future.", DeprecationWarning, ) super().__init__(*args, **kwargs) PKxyN_))happyly/schemas/__init__.pyfrom .schema import Schema # noqa: F401 PKxyN, happyly/schemas/schema.pyimport marshmallow class Schema(marshmallow.Schema): """ Marshmallow schema, which raises errors on mismatch (extra fields provided also raise exception). Subclass it just like any marshmallow Schema to describe schema. Instantiation with no arguments is a good strict default, but you can pass any arguments valid for `marshmallow.Schema` """ def __init__(self, *args, **kwargs): super().__init__(strict=True, *args, **kwargs) @marshmallow.validates_schema(pass_original=True) def check_unknown_fields(self, data, original_data): unknown = set(original_data) - set(self.fields) if unknown: raise marshmallow.ValidationError('Unknown field', unknown) PKcqN]5$ff!happyly/serialization/__init__.pyfrom .deserializer import Deserializer # noqa: F401 from .serializer import Serializer # noqa: F401 PKcqNJ%happyly/serialization/deserializer.pyfrom abc import ABC, abstractmethod from typing import Mapping, Any _not_impl = NotImplementedError('No default implementation in base Deserializer class') class Deserializer(ABC): @abstractmethod def deserialize(self, message: Any) -> Mapping[str, Any]: raise _not_impl @abstractmethod def build_error_result(self, message: Any, error: Exception) -> Mapping[str, Any]: raise _not_impl PKcqNe,,#happyly/serialization/serializer.pyfrom abc import ABC, abstractmethod from typing import Mapping, Any _no_default = NotImplementedError('No default implementation in base Serializer class') class Serializer(ABC): @abstractmethod def serialize(self, message_attributes: Mapping[str, Any]) -> Any: raise _no_default PKYaNk))happyly-0.4.1.dist-info/LICENSEMIT License Copyright (c) 2019 equeumco Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. PK!HPOhappyly-0.4.1.dist-info/WHEEL HM K-*ϳR03rOK-J,/RH,szd&Y)r$[)T&UrPK!H"  happyly-0.4.1.dist-info/METADATAV[o8~ϯ8JV-[ T,ZU+$nb^_f=Ǚig o}.-D hPrxkڀ54Gr^Rq4QXvXpx#O>x1u>E-)z|L?#*YwP#P+ct,(T9kG8)F%=CR[O4|ZϫEu0bZݝ2m Owe7N  9E $12;aw>>&^[*C971b7W0J免D*FIxGk޴1OUݼ '+eZZ[^y= u]Ɍ,+cC!vwqV_{2OJ4?l;o= PK!Hڰ~ happyly-0.4.1.dist-info/RECORDIH[}9 6 ^xY~2]".j|Gc 8W|96 Gv]«"̢ve>49c @BPQE}Ջf+o::Э0VVBEQ@ (٨yrUApOPp!Q]h.  =3RzĘ0I}es]o5)@QK=r_a"* rfCW++rѨLD4lh̭_yn }^XqC0Hƹ^x;c2<ay>~=ai9g] 4C$lmz do Eשu "?O}M/2N*_=BICX6[Z g @5}M+(b{*"LmCV<""}^f\m[\V9l&/pO`0k&LFc7+aʊO*R$췄rEv$Rw9zUF[t#2Ѹ