PKJlN&%sanic_sse/__init__.py""" Sanic sse package that adds Server-Sent Events compability to sanic application. Author: Innokenty Lebedev """ from .sse import Sse __version__ = "0.2.1" __all__ = ["Sse"] PKJlNGi  sanic_sse/pub_sub.py"""PubSub module This module contains PubSub class that implement publish/subscriber protocol """ import asyncio import uuid from typing import Dict class _StopMessage: # pylint: disable=too-few-public-methods pass class PubSub: """ Implementation of publish/subscriber protocol """ def __init__(self): self._channels: Dict[str, asyncio.Queue] = {} def publish_nowait(self, data: str, channel_id: str = None): """ Publish data to all subscribers or to channel with provided channel_id. This call is not blocking. :param str data: The data to publush :param str channel_id: If given then data will be send only to channel with that id """ if channel_id is not None: self._channels[channel_id].put_nowait(data) else: asyncio.gather(*[channel.put(data) for channel in self._channels.values()]) async def publish(self, data: str, channel_id: str = None): """ Publish data to all subscribers or to channel with provided channel_id. This call is blocking. :param str data: The data to publush :param str channel_id: If given then data will be send only to channel with that id """ if channel_id is not None: await self._channels[channel_id].put(data) else: await asyncio.gather( *[channel.put(data) for channel in self._channels.values()] ) def register(self, channel_id: str = None): """ Register new subscriber Return identifier of subscriber (str) """ if channel_id is None: channel_id = str(uuid.uuid4()) if channel_id in self._channels: raise ValueError(f"Given channel id {channel_id} is already in use") self._channels[channel_id] = asyncio.Queue() return channel_id def delete(self, channel_id: str): """ Delete subscriber by given channel_id :param str channel_id: Identifier of subscriber """ try: del self._channels[channel_id] except KeyError: return False return True async def get(self, channel_id: str): """ Return data for given subscriber. This call is blocking. :param str channel_id: Identifier of subscriber Return received data (str) """ data = await self._channels[channel_id].get() if isinstance(data, _StopMessage): self.delete(channel_id) raise ValueError("Stop message received") return data def task_done(self, channel_id): """ Notify that current data was processed :param str channel_id: Identifier of subscriber """ self._channels[channel_id].task_done() async def close(self): """ Close all subscribers """ await self.publish(_StopMessage()) def size(self): """ Return count of subscribers """ return len(self._channels) PKJlN2FW!yysanic_sse/sse.py"""Sse module. This module add sse compability to sanic app """ import re import io import asyncio import contextlib import inspect from http import HTTPStatus from sanic import Sanic from sanic.response import stream from sanic.exceptions import abort from .pub_sub import PubSub # pylint: disable=bad-continuation class Sse: """ A :class: that knows how to publish, subscribe to, and stream server-sent events. """ _DEFAULT_PING_INTERVAL = 15 _DEFAULT_SEPARATOR = "\r\n" _LINE_SEP_EXPR = re.compile(r"\r\n|\r|\n") _DEFAULT_URL = "/sse" _HEADERS = {"Cache-Control": "no-cache"} def __init__( # type: ignore self, app: Sanic = None, url: str = _DEFAULT_URL, ping_interval: int = _DEFAULT_PING_INTERVAL, before_request_func=None, ): """ Application initialization :param `sanic.Sanic` app: Sanic application :param str url: sse event url :param int ping_interval: interval of ping message """ self._ping_task = None self._before_request = None if app is not None: self.init_app(app, url, ping_interval, before_request_func) async def _ping(self): # periodically send ping to the browser. Any message that # starts with ":" colon ignored by a browser and could be used # as ping message. while True: await asyncio.sleep(self._ping_interval) await self._pubsub.publish( ": ping{0}{0}".format(self._DEFAULT_SEPARATOR).encode("utf-8") ) @staticmethod def _prepare(data, event_id=None, event=None, retry=None): buffer = io.StringIO() if event_id is not None: buffer.write(Sse._LINE_SEP_EXPR.sub("", f"id: {event_id}")) buffer.write(Sse._DEFAULT_SEPARATOR) if event is not None: buffer.write(Sse._LINE_SEP_EXPR.sub("", f"event: {event}")) buffer.write(Sse._DEFAULT_SEPARATOR) for chunk in Sse._LINE_SEP_EXPR.split(data): buffer.write(f"data: {chunk}") buffer.write(Sse._DEFAULT_SEPARATOR) if retry is not None: if not isinstance(retry, int): raise TypeError("retry argument must be int") buffer.write(f"retry: {retry}") buffer.write(Sse._DEFAULT_SEPARATOR) buffer.write(Sse._DEFAULT_SEPARATOR) return buffer.getvalue().encode("utf-8") async def send( # pylint: disable=too-many-arguments self, data: str, channel_id: str = None, event_id: str = None, event: str = None, retry: int = None, ): """Send data using EventSource protocol. This call is blocking :param str data: The data field for the message. :param str event_id: The event ID to set the EventSource object's last event ID value to. :param str event: The event's type. If this is specified, an event will be dispatched on the browser to the listener for the specified event name; the web site would use addEventListener() to listen for named events. The default event type is "message". :param int retry: The reconnection time to use when attempting to send the event. [What code handles this?] This must be an integer, specifying the reconnection time in milliseconds. If a non-integer value is specified, the field is ignored. """ data = self._prepare(data, event_id, event, retry) await self._pubsub.publish(data, channel_id) def send_nowait( # pylint: disable=too-many-arguments self, data: str, channel_id: str = None, event_id: str = None, event: str = None, retry: int = None, ): """Send data using EventSource protocol. This call is not blocking. :param str data: The data field for the message. :param str event_id: The event ID to set the EventSource object's last event ID value to. :param str event: The event's type. If this is specified, an event will be dispatched on the browser to the listener for the specified event name; the web site would use addEventListener() to listen for named events. The default event type is "message". :param int retry: The reconnection time to use when attempting to send the event. [What code handles this?] This must be an integer, specifying the reconnection time in milliseconds. If a non-integer value is specified, the field is ignored. """ data = self._prepare(data, event_id, event, retry) self._pubsub.publish_nowait(data, channel_id) def set_before_request_callback(self, func): """ Set function for callback before sse request. It can be used for authorizations purpose :param callable func: coroutine function with one parameter - request """ if not callable(func): raise TypeError(f"{func} should be callable") if not inspect.iscoroutinefunction(func): raise TypeError(f"{func} should be coroutine function") if len(inspect.signature(func).parameters) != 1: raise ValueError(f"{func} should get only one parameter - request") self._before_request = func def init_app( self, app: Sanic, url: str = _DEFAULT_URL, ping_interval: int = _DEFAULT_PING_INTERVAL, before_request_func=None, ): """ Application initialization :param `sanic.Sanic` app: Sanic application :param str url: sse event url :param int ping_interval: interval of ping message """ self._url = url self._ping_interval = ping_interval if before_request_func is not None: self.set_before_request_callback(before_request_func) self._pubsub = PubSub() @app.listener("after_server_start") def _on_start(_, loop): self._ping_task = loop.create_task(self._ping()) @app.listener("before_server_stop") async def _on_stop(_, __): self._ping_task.cancel() with contextlib.suppress(asyncio.CancelledError): await self._ping_task await self._pubsub.close() app.sse_send = self.send app.sse_send_nowait = self.send_nowait @app.route(self._url, methods=["GET"]) async def _(request): if self._before_request is not None: await self._before_request(request) channel_id = request.args.get("channel_id", None) try: channel_id = self._pubsub.register(channel_id=channel_id) except ValueError as exc: abort(HTTPStatus.BAD_REQUEST, str(exc)) async def streaming_fn(response): try: while True: try: data = await self._pubsub.get(channel_id) except ValueError: break await response.write(data) self._pubsub.task_done(channel_id) finally: self._pubsub.delete(channel_id) return stream( streaming_fn, headers=self._HEADERS, content_type="text/event-stream" ) PKJlN<<!sanic_sse-0.2.1.dist-info/LICENSEThe MIT License (MIT) Copyright (c) 2018 Innokenty Lebedev Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. PK!HPOsanic_sse-0.2.1.dist-info/WHEEL HM K-*ϳR03rOK-J,/RH,szd&Y)r$[)T&UrPK!Hjm "sanic_sse-0.2.1.dist-info/METADATAWmo6_)`tZxpMi΂-(TZm.]eIJ"m8sG`%'irHY'0)NzAYSUL:C f-[qjRVθp=qi> UU(mU?eu]YUq &tmmm&iv,MMw[;9ߞ_\ƮS)Sg| 1('^ W0yif|ق.$rS&ע=_)ixO60n UɌK!V+ͪJ=cr <Xw =mE ESzTZmxN)[QS.sgoxj=ݹo%|}}4V[82O/vH&KBFPst%1<ksFMdh-ͪo/T8Q1O( @9<r%Tߝ=7ƛ4èk䢣LR;Q,Qzr9Mzi:9ɼж$ hu;12\.T1  jq)&( 6%!M/ sc(3Ԉ.9)nN-sCA"@Zldg\ڌ.8CCa`j@C=Z(榵 6V 3®qn/gUre~)C\a 9B H>NUc(JA U&ͣ${Ra15(s%AA0#  k }oӷ=.K_g<{squ^TA#{PFK7GFFYF?l(uI8AY.ejhFzx888|U wHG<Tr'3{wH{p̃v{sy0OgY!c[g׽ P0*=|ܥ~8f|X€5Ë\E4Ϲj o/K"BӢo-?/>j: OWjԹ 3^H|Q [K:ˮEò[O?IXa&ngn21@Șz8nn+1^ڟ#\ 8W5%oMBPK!H,T sanic_sse-0.2.1.dist-info/RECORD}=0~Kp-2(2,^9\Ԣ|>t \ ^vGv*SHu&!{Esmk^Ѩ8P0_4 ݢ aybD]n+RSbNFtb<;vc~X}[[x1%RͣƇêε^nC뚓StPJ$аf~ PKJlN&%sanic_sse/__init__.pyPKJlNGi  sanic_sse/pub_sub.pyPKJlN2FW!yy5 sanic_sse/sse.pyPKJlN<<!*sanic_sse-0.2.1.dist-info/LICENSEPK!HPOW/sanic_sse-0.2.1.dist-info/WHEELPK!Hjm "/sanic_sse-0.2.1.dist-info/METADATAPK!H,T  6sanic_sse-0.2.1.dist-info/RECORDPK7