PKM{rN"]HIIhappyly/__init__.py"""Python library for Pub/Sub message handling""" __version__ = '0.2.0' PKzrNhappyly/caching/__init__.pyPKzrNy_happyly/caching/cacher.pyfrom abc import ABC, abstractmethod from typing import Any _no_default_impl = NotImplementedError('No default implementation for class Cacher') class Cacher(ABC): @abstractmethod def add(self, message: Any, key: str): raise _no_default_impl @abstractmethod def remove(self, key: str): raise _no_default_impl @abstractmethod def get(self, key: str): raise _no_default_impl PKzrN##happyly/caching/mixins.pyfrom typing import Any, Mapping, Optional from happyly.caching.cacher import Cacher from happyly.handling import HandlingResult class CacheByRequestIdMixin: def __init__(self, cacher: Cacher): self.cacher = cacher def on_received(self, message: Any): super().on_received(message) try: req_id = self._get_req_id(message) except Exception: pass else: self.cacher.add(message, key=req_id) def _get_req_id(self, message: Any) -> str: assert self.deserializer is not None attribtues = self.deserializer.deserialize(message) return attribtues[self.deserializer.request_id_field] def _rm(self, parsed_message: Mapping[str, Any]): assert self.deserializer is not None self.cacher.remove(parsed_message[self.deserializer.request_id_field]) def on_published( self, original_message: Any, parsed_message: Optional[Mapping[str, Any]], result: HandlingResult, ): super().on_published(original_message, parsed_message, result) if parsed_message is not None: self._rm(parsed_message) def on_publishing_failed( self, original_message: Any, parsed_message: Optional[Mapping[str, Any]], result: HandlingResult, error: Exception, ): super().on_publishing_failed(original_message, parsed_message, result, error) if parsed_message is not None: self._rm(parsed_message) def on_deserialization_failed(self, message: Any, error: Exception): super().on_deserialization_failed(message, error) try: req_id = self._get_req_id(message) except Exception: pass else: self.cacher.remove(key=req_id) PKzrN!happyly/google_pubsub/__init__.pyfrom .high_level import ( # noqa: F401 GoogleSimpleSender, GoogleSimpleReceiver, GoogleReceiveAndReplyComponent, GoogleSimpleReceiveAndReply, GoogleCachedReceiveAndReply, GoogleCachedReceiver, ) PK\lNx@((&happyly/google_pubsub/deserializers.pyfrom typing import Mapping, Any import json from attr import attrs import marshmallow from happyly.serialization import Deserializer @attrs(auto_attribs=True, frozen=True) class JSONDeserializerWithRequestIdRequired(Deserializer): schema: marshmallow.Schema request_id_field: str = 'request_id' status_field: str = 'status' error_field: str = 'error' _status_error: str = 'ERROR' def deserialize(self, message: Any) -> Mapping[str, Any]: data = message.data.decode('utf-8') deserialized, _ = self.schema.loads(data) return deserialized def build_error_result(self, message: Any, error: Exception) -> Mapping[str, Any]: attributes = json.load(message.data) try: return { self.request_id_field: attributes[self.request_id_field], self.status_field: self._status_error, self.error_field: repr(error), } except AttributeError: raise ValueError(f'message {message} contains no {self.request_id_field}') PKcqN#happyly/google_pubsub/publishers.pyfrom typing import Any from google.cloud import pubsub_v1 from happyly.pubsub import Publisher class GooglePubSubPublisher(Publisher): def __init__(self, project, *args, **kwargs): super().__init__(*args, **kwargs) self.project = project self._publisher_client = pubsub_v1.PublisherClient() def __attrs_post_init__(self): self._publisher_client = pubsub_v1.PublisherClient() def publish_message(self, serialized_message: Any, to: str): future = self._publisher_client.publish( f'projects/{self.project}/topics/{to}', serialized_message ) try: future.result() return except Exception as e: raise e PKzrNLE55%happyly/google_pubsub/redis_cacher.pyimport logging from typing import Any import redis from happyly.caching.cacher import Cacher _LOGGER = logging.getLogger(__name__) class RedisCacher(Cacher): def __init__(self, host, port): self.client = redis.StrictRedis(host=host, port=port) _LOGGER.info( f'Cache was successfully initialized with Redis client ({host}:{port})' ) def add(self, message: Any, key: str): self.client.set(key, message.data) _LOGGER.info(f'Cached message with id {key}') def remove(self, key: str): self.client.delete(key) _LOGGER.info(f'Message with id {key} was removed from cache') def get(self, key: str): self.client.get(key) def get_all(self): keys = self.client.keys() return [self.client.get(k) for k in keys] PKaN%v$happyly/google_pubsub/serializers.pyfrom typing import Mapping, Any import marshmallow from attr import attrs from happyly.serialization.serializer import Serializer @attrs(auto_attribs=True, frozen=True) class BinaryJSONSerializer(Serializer): schema: marshmallow.Schema def serialize(self, message_attributes: Mapping[str, Any]) -> Any: data, _ = self.schema.dumps(message_attributes) return data.encode('utf-8') PKcqN47$happyly/google_pubsub/subscribers.pyimport logging from typing import Callable, Any from attr import attrs, attrib from google.cloud import pubsub_v1 import happyly.pubsub _LOGGER = logging.getLogger(__name__) @attrs(auto_attribs=True) class GooglePubSubSubscriber(happyly.pubsub.Subscriber): project: str subscription_name: str _subscription_client: pubsub_v1.SubscriberClient = attrib(init=False) _subscription_path: str = attrib(init=False) def __attrs_post_init__(self): s = pubsub_v1.SubscriberClient() self._subscription_path = s.subscription_path( self.project, self.subscription_name ) self._subscription_client = s def subscribe(self, callback: Callable[[Any], Any]): _LOGGER.info(f'Starting to listen to {self.subscription_name}') return self._subscription_client.subscribe(self._subscription_path, callback) def ack(self, message): message.ack() PKzrN,happyly/google_pubsub/high_level/__init__.pyfrom .simple import ( # noqa: F401 GoogleSimpleSender, GoogleSimpleReceiver, GoogleSimpleReceiveAndReply, GoogleReceiveAndReplyComponent, ) from .with_cache import GoogleCachedReceiveAndReply, GoogleCachedReceiver # noqa: F401 PKzrNw *happyly/google_pubsub/high_level/simple.pyimport marshmallow from happyly.google_pubsub.deserializers import JSONDeserializerWithRequestIdRequired from happyly.google_pubsub.publishers import GooglePubSubPublisher from happyly.google_pubsub.serializers import BinaryJSONSerializer from happyly.google_pubsub.subscribers import GooglePubSubSubscriber from happyly.handling import Handler from happyly.listening import Listener from happyly.listening.executor import Executor class GoogleSimpleSender(Executor[None, GooglePubSubPublisher]): def __init__( self, handler: Handler, output_schema: marshmallow.Schema, to_topic: str, project: str, ): publisher = GooglePubSubPublisher( project=project, publish_all_to=to_topic, serializer=BinaryJSONSerializer(schema=output_schema), ) handler = handler super().__init__(publisher=publisher, handler=handler) class GoogleSimpleReceiver(Listener[JSONDeserializerWithRequestIdRequired, None]): def __init__( self, handler: Handler, input_schema: marshmallow.Schema, from_subscription: str, project: str, ): subscriber = GooglePubSubSubscriber( project=project, subscription_name=from_subscription ) handler = handler deserializer = JSONDeserializerWithRequestIdRequired(schema=input_schema) super().__init__( subscriber=subscriber, handler=handler, deserializer=deserializer ) class GoogleSimpleReceiveAndReply( Listener[JSONDeserializerWithRequestIdRequired, GooglePubSubPublisher] ): def __init__( self, handler: Handler, input_schema: marshmallow.Schema, from_subscription: str, output_schema: marshmallow.Schema, to_topic: str, project: str, ): subscriber = GooglePubSubSubscriber( project=project, subscription_name=from_subscription ) handler = handler deserializer = JSONDeserializerWithRequestIdRequired(schema=input_schema) publisher = GooglePubSubPublisher( project=project, publish_all_to=to_topic, serializer=BinaryJSONSerializer(schema=output_schema), ) super().__init__( handler=handler, deserializer=deserializer, subscriber=subscriber, publisher=publisher, ) # for compatibility GoogleReceiveAndReplyComponent = GoogleSimpleReceiveAndReply PKzrNDB$$.happyly/google_pubsub/high_level/with_cache.pyimport marshmallow from happyly.caching.cacher import Cacher from happyly.caching.mixins import CacheByRequestIdMixin from happyly.handling import Handler from happyly.google_pubsub.high_level.simple import ( GoogleSimpleReceiveAndReply, GoogleSimpleReceiver, ) class GoogleCachedReceiveAndReply(CacheByRequestIdMixin, GoogleSimpleReceiveAndReply): def __init__( self, handler: Handler, input_schema: marshmallow.Schema, from_subscription: str, output_schema: marshmallow.Schema, to_topic: str, project: str, cacher: Cacher, ): GoogleSimpleReceiveAndReply.__init__( self, handler, input_schema, from_subscription, output_schema, to_topic, project, ) CacheByRequestIdMixin.__init__(self, cacher) class GoogleCachedReceiver(CacheByRequestIdMixin, GoogleSimpleReceiver): def __init__( self, handler: Handler, input_schema: marshmallow.Schema, from_subscription: str, project: str, cacher: Cacher, ): GoogleSimpleReceiver.__init__( self, handler, input_schema, from_subscription, project ) CacheByRequestIdMixin.__init__(self, cacher) PKcqNLx{{happyly/handling/__init__.pyfrom .handler import Handler # noqa: F401 from .handling_result import HandlingResult, HandlingResultStatus # noqa: F401 PKcqNhappyly/handling/handler.pyfrom abc import ABC, abstractmethod from typing import Mapping, Any from .types import ZeroToManyParsedMessages from .handling_result import HandlingResult _no_base_impl = NotImplementedError('No default implementation in base Handler class') class Handler(ABC): """ A class containing logic to handle a parsed message. """ @abstractmethod def handle(self, message: Mapping[str, Any]) -> ZeroToManyParsedMessages: """ Applies logic using a provided message, optionally gives back one or more results. Each result consists of message attributes which can be serialized and sent. When fails, calls `on_handling_failed` :param message: A parsed message as a dictionary of attributes :return: None if no result is extracted from handling, a dictionary of attributes for single result or a list of dictionaries if handling provides multiple results """ raise _no_base_impl @abstractmethod def on_handling_failed( self, message: Mapping[str, Any], error: Exception ) -> ZeroToManyParsedMessages: """ Applies fallback logic using a provided message when `handle` fails, optionally gives back one or more results. Enforces users of `Handler` class to provide explicit strategy for errors :param message: A parsed message as a dictionary of attributes :param error: Error raised by `handle` :return: None if no result is extracted from handling, a dictionary of attributes for single result or a list of dictionaries if handling provides multiple results """ raise _no_base_impl def __call__(self, message: Mapping[str, Any]) -> HandlingResult: try: result_data = self.handle(message) return HandlingResult.ok(result_data) except Exception as e: result_data = self.on_handling_failed(message, e) return HandlingResult.err(result_data) PKcqN#happyly/handling/handling_result.pyfrom enum import Enum from attr import attrs from .types import ZeroToManyParsedMessages class HandlingResultStatus(Enum): OK = 'OK' ERR = 'ERR' @attrs(auto_attribs=True, frozen=True) class HandlingResult: status: HandlingResultStatus data: ZeroToManyParsedMessages @classmethod def ok(cls, data): return HandlingResult(status=HandlingResultStatus.OK, data=data) @classmethod def err(cls, data): return HandlingResult(status=HandlingResultStatus.ERR, data=data) PK~aNfhappyly/handling/types.pyfrom typing import Mapping, Any, Union, List _ParsedMessage = Mapping[str, Any] ZeroToManyParsedMessages = Union[_ParsedMessage, List[_ParsedMessage], None] PKcqNQZZhappyly/listening/__init__.pyfrom .listener import Listener # noqa: F401 from .executor import Executor # noqa: F401 PKzrN$/E{happyly/listening/executor.pyimport logging from typing import Mapping, Any, Optional, TypeVar, Generic from attr import attrs from happyly.handling import Handler, HandlingResult from happyly.serialization.deserializer import Deserializer from happyly.pubsub import Publisher _LOGGER = logging.getLogger(__name__) D = TypeVar("D", bound=Deserializer) P = TypeVar("P", bound=Publisher) @attrs(auto_attribs=True) class Executor(Generic[D, P]): handler: Handler deserializer: Optional[D] = None publisher: Optional[P] = None def on_received(self, message: Any): pass def on_deserialized(self, original_message: Any, parsed_message: Mapping[str, Any]): pass def on_deserialization_failed(self, message: Any, error: Exception): pass def on_handled( self, original_message: Any, parsed_message: Mapping[str, Any], result: HandlingResult, ): pass def on_published( self, original_message: Any, parsed_message: Optional[Mapping[str, Any]], result: HandlingResult, ): pass def on_publishing_failed( self, original_message: Any, parsed_message: Optional[Mapping[str, Any]], result: HandlingResult, error: Exception, ): pass def _when_parsing_succeeded(self, original: Any, parsed: Mapping[str, Any]): result = self.handler(parsed) _LOGGER.info(f"Message handled, status {result.status}") self.on_handled(original_message=original, parsed_message=parsed, result=result) if self.publisher is not None: self._try_publish(original, parsed, result) def _when_parsing_failed(self, message: Any, error: Exception): if self.publisher is None: return assert self.deserializer is not None result = self.deserializer.build_error_result(message, error) handling_result = HandlingResult.err(result) self._try_publish(original=message, parsed=None, result=handling_result) def _try_publish( self, original: Any, parsed: Optional[Mapping[str, Any]], result: HandlingResult ): assert self.publisher is not None try: self.publisher.publish_result(result) _LOGGER.info(f"Published result:\n{result}") self.on_published( original_message=original, parsed_message=parsed, result=result ) except Exception as e: _LOGGER.exception("Failed to publish result:\n{result}") self.on_publishing_failed( original_message=original, parsed_message=parsed, result=result, error=e ) def _run_no_deser(self, message: Optional[Any]): if message is not None: raise ValueError("No deserializer to parse non-empty message.") if message is None: self._when_parsing_succeeded(original=None, parsed={}) def _after_on_received(self, message: Optional[Any]): try: assert self.deserializer is not None parsed = self.deserializer.deserialize(message) except Exception as e: _LOGGER.exception( f"Was not able to deserialize the following message\n{message}" ) self.on_deserialization_failed(message, error=e) self._when_parsing_failed(message, error=e) else: _LOGGER.debug( f"Message successfully deserialized into attributes:\n {parsed}" ) self.on_deserialized(message, parsed) self._when_parsing_succeeded(original=message, parsed=parsed) def _run_with_deser(self, message: Optional[Any]): _LOGGER.info(f"Received message:\n {message}") self.on_received(message) self._after_on_received(message) def run(self, message: Optional[Any] = None): if self.deserializer is None: self._run_no_deser(message) else: self._run_with_deser(message) PKzrN9dJhappyly/listening/listener.pyfrom typing import Any, TypeVar, Optional from attr import attrs, attrib from happyly.pubsub import Publisher from happyly.pubsub.subscriber import Subscriber from happyly.serialization import Deserializer from .executor import Executor D = TypeVar("D", bound=Deserializer) P = TypeVar("P", bound=Publisher) @attrs(auto_attribs=True) class Listener(Executor[D, P]): subscriber: Subscriber = attrib(kw_only=True) def on_acknowledged(self, message: Any): pass def _after_on_received(self, message: Optional[Any]): self.subscriber.ack(message) self.on_acknowledged(message) super()._after_on_received(message) def start_listening(self): return self.subscriber.subscribe(callback=self.run) PKcqNf7``happyly/pubsub/__init__.pyfrom .publisher import Publisher # noqa: F401 from .subscriber import Subscriber # noqa: F401 PKcqNO[  happyly/pubsub/publisher.pyfrom abc import ABC, abstractmethod from typing import Optional, Mapping, Any, List from happyly.handling import HandlingResult, HandlingResultStatus from happyly.serialization.serializer import Serializer class Publisher(ABC): def __init__( self, serializer: Serializer, publish_all_to: Optional[str] = None, publish_success_to: Optional[str] = None, publish_failure_to: Optional[str] = None, ): self._serializer = serializer if publish_all_to is not None and all( p is None for p in [publish_success_to, publish_failure_to] ): self.publish_success_to: str = publish_all_to self.publish_failure_to: str = publish_all_to elif ( publish_success_to is not None and publish_failure_to is not None ) and publish_all_to is None: self.publish_success_to: str = publish_success_to self.publish_failure_to: str = publish_failure_to else: raise ValueError( """Provide "publish_all_to" only, or else provide both "publish_success_to" and "publish_failure_to""" ) @abstractmethod def publish_message(self, serialized_message: Any, to: str): raise NotImplementedError("No default implementation in base Publisher class") def _get_destination(self, status: HandlingResultStatus): if status == HandlingResultStatus.OK: return self.publish_success_to elif status == HandlingResultStatus.ERR: return self.publish_failure_to else: raise ValueError(f"Unknown status {status}") def _publish_serialized(self, data: Mapping[str, Any], to: str): serialized = self._serializer.serialize(data) self.publish_message(serialized, to) def publish_result(self, result: HandlingResult): data = result.data if data is None: return destination = self._get_destination(result.status) if isinstance(data, Mapping): self._publish_serialized(data, to=destination) elif isinstance(data, List): for item in data: self._publish_serialized(item, to=destination) else: raise ValueError("Invalid data structure") PKcqN  happyly/pubsub/subscriber.pyfrom abc import ABC, abstractmethod from typing import Callable, Any class Subscriber(ABC): @abstractmethod def subscribe(self, callback: Callable[[Any], Any]): raise NotImplementedError @abstractmethod def ack(self, message): raise NotImplementedError PKcqN]5$ff!happyly/serialization/__init__.pyfrom .deserializer import Deserializer # noqa: F401 from .serializer import Serializer # noqa: F401 PKcqNJ%happyly/serialization/deserializer.pyfrom abc import ABC, abstractmethod from typing import Mapping, Any _not_impl = NotImplementedError('No default implementation in base Deserializer class') class Deserializer(ABC): @abstractmethod def deserialize(self, message: Any) -> Mapping[str, Any]: raise _not_impl @abstractmethod def build_error_result(self, message: Any, error: Exception) -> Mapping[str, Any]: raise _not_impl PKcqNe,,#happyly/serialization/serializer.pyfrom abc import ABC, abstractmethod from typing import Mapping, Any _no_default = NotImplementedError('No default implementation in base Serializer class') class Serializer(ABC): @abstractmethod def serialize(self, message_attributes: Mapping[str, Any]) -> Any: raise _no_default PKYaNk))happyly-0.2.0.dist-info/LICENSEMIT License Copyright (c) 2019 equeumco Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. PK!HPOhappyly-0.2.0.dist-info/WHEEL HM K-*ϳR03rOK-J,/RH,szd&Y)r$[)T&UrPK!H  happyly-0.2.0.dist-info/METADATAV[o8~ϯ8J7`5в ZQJEˢU"N&esLA{-;߹ Se;9:ni2=w. JVpc\Ī{ 4%u^lOM_?}m3Y Qγb)ƴJUv‘_їpp-fxEϥWzq,$ Aw!odtcpS8o\8:h θn#%vz~"կ?<S&x~+]ϕ2,[8HOyݾRr'3 ;uCd (º0]#W{H4: ԑjSn6+*e*,^+b0NrŤtEgA0oD]t}2LpqǣsٷwR3i ;XY̖,=2J׎ yY"? 1C޲ae|PcFPjk6p"զޚwB`"PsFO]fZjA2xKTFv`Q"WkɪK>|XjDQ `dA+Bsѱ˿R嬹$pRSL In#>uNzи=^qBi;<OiywAW 5pE $12;a+kp ]iT@z  Gt+G%wTK"G?ꣵQ{o'{5o( &@V#%-E^G煺)dJ֕1;E޸r+E/='l%6Çp灷PK!H9J^\ happyly-0.2.0.dist-info/RECORDɶ8} $ âf`3qa36|};I'q=+iu%UjESL[10{CRZ%y/C~0f[KCtlC`wJQE}&kݍ#X0qbYK&.d{G<7 !ok2py.խup(̳G},$)Av<0X_@'^L kBi?} {gq(OP ͪp^>ːnյԅ7YVI&:#:=;]B Cq2&CTŖ 6 fq=p " iv+t݁yqבryPoRc};bbI9s!-VNjr(gSކdHb?d0uFֵfoZV(RǶti{w$+! CeDAo:DZYN<(u9a٩\R@ws".tnA%|#) S DEM7v^d_%KR}V2]qљRT{$sĕ2z*3R_8c,J^NЩ|j" uL UO |uFbd׷$zV9G5!;W 1:*NM&QGS6F\CI6RVڋ ;m a2ھAD\sLqw9749xY2`z'8GEH`<.# F`_/ϩ4E_e/cT> V/&gtRtz#x?ՓSx7LQ be~+ v[_:3Aw׵USoR=DkIp92F[XYE PKM{rN"]HIIhappyly/__init__.pyPKzrNzhappyly/caching/__init__.pyPKzrNy_happyly/caching/cacher.pyPKzrN##happyly/caching/mixins.pyPKzrN! happyly/google_pubsub/__init__.pyPK\lNx@((& happyly/google_pubsub/deserializers.pyPKcqN#thappyly/google_pubsub/publishers.pyPKzrNLE55%happyly/google_pubsub/redis_cacher.pyPKaN%v$happyly/google_pubsub/serializers.pyPKcqN47$happyly/google_pubsub/subscribers.pyPKzrN,happyly/google_pubsub/high_level/__init__.pyPKzrNw *happyly/google_pubsub/high_level/simple.pyPKzrNDB$$.&'happyly/google_pubsub/high_level/with_cache.pyPKcqNLx{{,happyly/handling/__init__.pyPKcqNK-happyly/handling/handler.pyPKcqN#g5happyly/handling/handling_result.pyPK~aNf7happyly/handling/types.pyPKcqNQZZ8happyly/listening/__init__.pyPKzrN$/E{9happyly/listening/executor.pyPKzrN9dJIhappyly/listening/listener.pyPKcqNf7``;Lhappyly/pubsub/__init__.pyPKcqNO[  Lhappyly/pubsub/publisher.pyPKcqN  Vhappyly/pubsub/subscriber.pyPKcqN]5$ff!uWhappyly/serialization/__init__.pyPKcqNJ%Xhappyly/serialization/deserializer.pyPKcqNe,,#Zhappyly/serialization/serializer.pyPKYaNk))q[happyly-0.2.0.dist-info/LICENSEPK!HPO_happyly-0.2.0.dist-info/WHEELPK!H  b`happyly-0.2.0.dist-info/METADATAPK!H9J^\ ehappyly-0.2.0.dist-info/RECORDPK Bk