PK!,--channels_graphql_ws/__init__.py# # coding: utf-8 # Copyright (c) 2018 DATADVANCE # # Permission is hereby granted, free of charge, to any person obtaining # a copy of this software and associated documentation files (the # "Software"), to deal in the Software without restriction, including # without limitation the rights to use, copy, modify, merge, publish, # distribute, sublicense, and/or sell copies of the Software, and to # permit persons to whom the Software is furnished to do so, subject to # the following conditions: # # The above copyright notice and this permission notice shall be # included in all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, # EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF # MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. # IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY # CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, # TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE # SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. """Websocket GraphQL server with subscriptions. Django Channels based WebSocket GraphQL server with Graphene-like subscriptions. """ from .graphql_ws import Subscription from .graphql_ws import GraphqlWsConsumer PK!iLzz!channels_graphql_ws/graphql_ws.py# # coding: utf-8 # Copyright (c) 2018 DATADVANCE # # Permission is hereby granted, free of charge, to any person obtaining # a copy of this software and associated documentation files (the # "Software"), to deal in the Software without restriction, including # without limitation the rights to use, copy, modify, merge, publish, # distribute, sublicense, and/or sell copies of the Software, and to # permit persons to whom the Software is furnished to do so, subject to # the following conditions: # # The above copyright notice and this permission notice shall be # included in all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, # EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF # MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. # IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY # CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, # TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE # SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. """GraphQL over WebSockets implementation with subscriptions. This module contains implementation of GraphQL WebSocket protocol. The implementation bases on the Graphene and the Channels 2. The `Subscription` class itself is a "creative" copy of `Mutation` class from the Graphene (`graphene/types/mutation.py`). The `GraphqlWsConsumer` is a Channels WebSocket consumer which maintains WebSocket connection with the client. Implementation assumes that client uses the protocol implemented by the library `subscription-transport-ws` (which is used by Apollo). """ # NOTE: The motivation is that currently there is no viable Python-based # GraphQL subscriptions implementation out of the box. Hopefully there # is a promising GraphQL WS https://github.com/graphql-python/graphql-ws # library by the Graphene authors. In particular this pull request # https://github.com/graphql-python/graphql-ws/pull/9 gives a hope that # implementation in the current file can be replaced with GraphQL WS one # day. # NOTE: Links based on which this functionality is implemented: # - Protocol description: # https://github.com/apollographql/subscriptions-transport-ws/blob/master/PROTOCOL.md # https://github.com/apollographql/subscriptions-transport-ws/blob/master/src/message-types.ts # - ASGI specification for WebSockets: # https://github.com/django/asgiref/blob/master/specs/www.rst#websocket # - GitHubGist with the root of inspiration: # https://gist.github.com/tricoder42/af3d0337c1b33d82c1b32d12bd0265ec import asyncio import collections import traceback import types import asgiref.sync import channels.db import channels.generic.websocket as ch_websocket import channels.layers import graphene import graphene.types.objecttype import graphene.types.utils import graphene.utils.get_unbound_function import graphene.utils.props import graphene_django.views import graphql.error from namedlist import namedlist import rx class Subscription(graphene.ObjectType): """Subscription type definition. Subclass this class to define a GraphQL subscription. The class works with `GraphqlWsConsumer` which maintains a WebSocket connection with the client. The subclass specifies the following methods. publish: Called when subscription triggers. Method signature is the same as in other GraphQL "resolver" methods, except that the `self` argument holds the payload specified in the `broadcast` invocation triggered the subscription. Required. subscribe: Method called when client subscribes. Method signature is the same as in other GraphQL "resolver" methods but it must return names of subscription groups to put subscription into. Optional. unsubscribed: Called when user unsubscribes. Method signature is the same as in other GraphQL "resolver" methods. Optional. The methods enlisted above receives "standard" set of GraphQL resolver arguments (`self, info, ...`). The `info` field has `context` which can be used to transmit some useful payload between these methods. For example if `subscribe` sets `info.context.zen=42` then `publish` will have access to this value `info.context.zen`. Static methods of subscription subclass: broadcast: Call this method to notify all subscriptions in the group. unsubscribe: Call this method to stop all subscriptions in the group. """ # ------------------------------------------------------------ OVERWRITE IN SUBCLASS def publish(self, info, *args, **kwds): """GraphQL resolver for subscription notifications. Overwrite this to "resolve" subscription query. This method invoked each time subscription "triggers". Args: self: The value provided as `payload` when `publish` called. info: The value of `info.context` is a Channels websocket context with all the connection information. args, kwds: Values of the GraphQL subscription inputs. Returns: The same the any Graphene resolver returns. """ raise NotImplementedError() def subscribe(self, info, *args, **kwds): # pylint: disable=unused-argument """Called when client subscribes. Overwrite this to do some extra work when client subscribes and to groups subscriptions into different subscription groups. Args: self: Typically `None`. info: The value of `info.context` is a Channels websocket context with all the connection information. args, kwds: Values of the GraphQL subscription inputs. Returns: The list or tuple of subscription group names this subscription instance belongs to. Later the subscription will trigger on publishes to any of that groups. If method returns None (default behavior) then the subscription is only put to the default group (the one which corresponds to the `Subscription` subclass). """ return def unsubscribed(self, info, *args, **kwds): """Called when client unsubscribes. Overwrite to be notified when client unsubscribes. Args: self: None info: The value of `info.context` is a Channels websocket context with all the connection information. args, kwds: Values of the GraphQL subscription inputs. """ pass # --------------------------------------------------- SUBSCRIPTION CONTROL INTERFACE @classmethod def broadcast(cls, *, group=None, payload=None): """Call this method to notify all subscriptions in the group. Args: group: Name of the subscription group which members must be notified. `None` means that all the subscriptions of type will be triggered. payload: The payload delivered to the `publish` handler as the `self` argument. """ # Send the message to the Channels group. group_send = asgiref.sync.async_to_sync( channels.layers.get_channel_layer().group_send ) group = cls._group_name(group) group_send( group=group, message={"type": "broadcast", "group": group, "payload": payload}, ) @classmethod def unsubscribe(cls, *, group=None): """Call this method to stop all subscriptions in the group. Args: group: Name of the subscription group which members must be unsubscribed. `None` means that all the client of the subscription will be unsubscribed. """ # Send the message to the Channels group. group_send = asgiref.sync.async_to_sync( channels.layers.get_channel_layer().group_send ) group = cls._group_name(group) group_send(group=group, message={"type": "unsubscribe", "group": group}) @classmethod def Field( cls, name=None, description=None, deprecation_reason=None, required=False ): """Represent subscription as a field to "deploy" it.""" return graphene.Field( cls._meta.output, args=cls._meta.arguments, resolver=cls._meta.resolver, name=name, description=description, deprecation_reason=deprecation_reason, required=required, ) @classmethod def __init_subclass_with_meta__( cls, subscribe=None, publish=None, unsubscribed=None, output=None, arguments=None, _meta=None, **options, ): # pylint: arguments-differ """Prepare subscription when on subclass creation. This method is invoked by the superclass `__init__subclass__`. It is needed to process class fields, `Meta` and inheritance parameters. This is genuine Graphene approach. """ if not _meta: _meta = SubscriptionOptions(cls) output = output or getattr(cls, "Output", None) # Collect fields if output class is not explicitly defined. fields = {} if not output: fields = collections.OrderedDict() for base in reversed(cls.__mro__): fields.update( graphene.types.utils.yank_fields_from_attrs( base.__dict__, _as=graphene.Field ) ) output = cls if not arguments: input_class = getattr(cls, "Arguments", None) if input_class: arguments = graphene.utils.props.props(input_class) else: arguments = {} # Get `publish`, `subscribe`, and `unsubscribe` handlers. subscribe = subscribe or getattr(cls, "subscribe") publish = publish or getattr(cls, "publish") assert publish is not Subscription.publish, ( f"Subscription `{cls.__qualname__}` does not define a " "method `publish`! All subscriptions must define " "`publish` which processes a GraphQL query!" ) unsubscribed = unsubscribed or getattr(cls, "unsubscribed") if _meta.fields: _meta.fields.update(fields) else: _meta.fields = fields # Auxiliary alias. get_function = graphene.utils.get_unbound_function.get_unbound_function _meta.arguments = arguments _meta.output = output _meta.resolver = get_function(cls._subscribe) _meta.subscribe = get_function(subscribe) _meta.publish = get_function(publish) _meta.unsubscribed = get_function(unsubscribed) super().__init_subclass_with_meta__(_meta=_meta, **options) @classmethod def _subscribe(cls, obj, info, *args, **kwds): # pylint: disable=unused-argument """Subscription request received. This is called by the Graphene when a client subscribes. """ # Extract function which associates the callback with the groups # and remove it from the context so extra field does not pass to # the `subscribe` method of a subclass. register = info.context.register del info.context.register # Attach current subscription to the group corresponding to the # concrete class. This allows to trigger all the subscriptions # of the current type, by invoking `publish` without setting # the `group` argument. groups = [cls._group_name()] # Invoke the subclass-specified `subscribe` method to get the # groups subscription must be attached to. subclass_groups = cls._meta.subscribe(obj, info, *args, **kwds) subclass_groups = subclass_groups or [] assert isinstance( subclass_groups, (list, tuple) ), "Subscribe must return a list or a tuple of group names!" groups += [cls._group_name(group) for group in subclass_groups] # Register callbacks to call `publish` and `unsubscribed`. # Function `register` provides an observable which must # be returned from here, cause that is what GraphQL expects from # the subscription "resolver" functions. def publish_callback(payload): """Call `publish` with the payload as `self`.""" return cls._meta.publish(payload, info, *args, **kwds) def unsubscribed_callback(): """Call `unsubscribed` with `None` as `self`.""" return cls._meta.unsubscribed(None, info, *args, **kwds) return register(groups, publish_callback, unsubscribed_callback) @classmethod def _group_name(cls, group=None): """Group name based on the name of the subscription class.""" name = f"SUBSCRIPTION-{cls.__module__}.{cls.__qualname__}" if group is not None: name += "." + group return name class SubscriptionOptions(graphene.types.objecttype.ObjectTypeOptions): """Options stored in the Subscription's `_meta` field.""" arguments = None output = None subscribe = None publish = None class GraphqlWsConsumer(ch_websocket.AsyncJsonWebsocketConsumer): """Channels consumer for the WebSocket GraphQL backend. NOTE: Each instance of this class maintains one WebSocket connection to a single client. NOTE: The class made async only to send keepalive messages using eventloop. I am not sure it is brilliant idea though. This class implements the WebSocket-based GraphQL protocol used by `subscriptions-transport-ws` library (used by Apollo): https://github.com/apollographql/subscriptions-transport-ws/blob/master/PROTOCOL.md """ # ------------------------------------------------------------ OVERWRITE IN SUBCLASS # Overwrite this in the subclass to specify the GraphQL schema which # processes GraphQL queries. schema = None # The interval to send keepalive messages to the clients. send_keepalive_every = None async def on_connect(self, payload): """Called after CONNECTION_INIT message from client. Overwrite to raise an Exception to tell the server to reject the connection when it's necessary. Args: payload: payload from CONNECTION_INIT message. """ pass # ------------------------------------------------------------------- IMPLEMENTATION # Subscription WebSocket subprotocol. GRAPHQL_WS_SUBPROTOCOL = "graphql-ws" # Structure that holds subscription information. _SubInf = namedlist("_SubInf", ["groups", "op_id", "trigger", "unsubscribed"]) def __init__(self, *args, **kwargs): assert self.schema is not None, ( "An attribute `schema` is not set! Subclasses must specify " "the schema which processes GraphQL subscription queries." ) # Registry of active (subscribed) subscriptions. self._subscriptions = {} # {'': '', ...} self._sids_by_group = {} # {'': ['', '', ...], ...} # Task that sends keepalive messages periodically. self._keepalive_task = None super().__init__(*args, **kwargs) # ---------------------------------------------------------- CONSUMER EVENT HANDLERS async def connect(self): """Handle new WebSocket connection.""" # Check the subprotocol told by the client. # # NOTE: In Python 3.6 `scope["subprotocols"]` was a string, but # starting with Python 3.7 it is a bytes. This can be a proper # change or just a bug in the Channels to be fixed. So let's # accept both variants until it becomes clear. assert self.GRAPHQL_WS_SUBPROTOCOL in ( (sp.decode() if isinstance(sp, bytes) else sp) for sp in self.scope["subprotocols"] ), ( f"WebSocket client does not request for the subprotocol " f"{self.GRAPHQL_WS_SUBPROTOCOL}!" ) # Accept connection with the GraphQL-specific subprotocol. await self.accept(subprotocol=self.GRAPHQL_WS_SUBPROTOCOL) async def disconnect( self, close_code ): # pylint: disable=unused-argument,arguments-differ """WebSocket disconnection handler.""" # Remove itself from Channels groups and clear triggers. await self._cleanup() async def receive_json(self, content): # pylint: disable=arguments-differ """Process WebSocket message received from the client.""" # TODO: Spawn a task which asynchronously process the request. # TODO: Make task registry in this consumer. # TODO: Make tunable which enables/disables serialization. # Extract message type based on which we select how to proceed. msg_type = content["type"].upper() if msg_type == "CONNECTION_INIT": await self._on_gql_connection_init(payload=content["payload"]) elif msg_type == "CONNECTION_TERMINATE": await self._on_gql_connection_terminate() elif msg_type == "START": await self._on_gql_start( operation_id=content["id"], payload=content["payload"] ) elif msg_type == "STOP": await self._on_gql_stop(operation_id=content["id"]) else: await self._send_gql_error( content["id"], f'Message of unknown type "{msg_type}" received!' ) async def broadcast(self, message): """The broadcast message handler. Method is called when new `broadcast` message received from the Channels group. The message is typically sent by the method `Subscription.broadcast`. Here we figure out the group message received from and trigger the observable which makes the subscription process the query and notify the client. """ group = message["group"] payload = message["payload"] # Offload trigger to the thread (in threadpool) cause it may # work slowly and do DB operations. db_sync_to_async = channels.db.database_sync_to_async triggers = ( db_sync_to_async(self._subscriptions[op_id].trigger) for op_id in self._sids_by_group[group] ) await asyncio.wait([trigger(payload) for trigger in triggers]) async def unsubscribe(self, message): """The unsubscribe message handler. Method is called when new `_unsubscribe` message received from the Channels group. The message is typically sent by the method `Subscription.unsubscribe`. Here we figure out the group message received from and stop all the active subscriptions in this group. """ group = message["group"] # Unsubscribe all active subscriptions current client has in # the subscription group `group`. await asyncio.wait( [self._on_gql_stop(sid) for sid in self._sids_by_group[group]] ) # ---------------------------------------------------------- GRAPHQL PROTOCOL EVENTS async def _on_gql_connection_init(self, payload): # pylint: disable=unused-argument """Process the CONNECTION_INIT message. Start sending keepalive messages if `send_keepalive_every` set. Respond with either CONNECTION_ACK or CONNECTION_ERROR message. """ try: # Notify subclass a new client is connected. await self.on_connect(payload) except Exception as exc: # Send CONNECTION_ERROR message. error_to_dict = graphene_django.views.GraphQLView.format_error await self._send_gql_connection_error(error_to_dict(exc)) # Close the connection. await self._cleanup() # NOTE: We use the 4000 code because there are two reasons: # A) We can not use codes greater than 1000 and less than # 3000 because daphne and autobahn do not allow this # (see `sendClose` from `autobahn/websocket/protocol.py` # and `daphne/ws_protocol.py`). # B) https://developer.mozilla.org/en-US/docs/Web/API/CloseEvent # So mozilla offers us the following codes: # 4000–4999 - Available for use by applications. await self.close(code=4000) else: # Send CONNECTION_ACK message. await self._send_gql_connection_ack() # If keepalive enabled then send one message immediately and # schedule periodic messages. if self.send_keepalive_every is not None: async def keepalive_sender(): """Send keepalive messages periodically.""" while True: await asyncio.sleep(self.send_keepalive_every) await self._send_gql_connection_keep_alive() self._keepalive_task = asyncio.ensure_future(keepalive_sender()) # Immediately send keepalive message cause it is # required by the protocol description. await self._send_gql_connection_keep_alive() async def _on_gql_connection_terminate(self): """Process the CONNECTION_TERMINATE message.""" # Remove itself from Channels groups and clear triggers. await self._cleanup() # Close the connection. await self.close(code=1000) async def _on_gql_start(self, operation_id, payload): """Process the START message. This message holds query, mutation or subscription request. """ try: if operation_id in self._subscriptions: raise graphql.error.GraphQLError( f"Subscription with the given `id={operation_id}` " "already exists! Would you like to unsubscribe first?" ) # Get the message data. op_id = operation_id query = payload["query"] operation_name = payload.get("operationName") variables = payload.get("variables", {}) # Local alias for convenience. async_to_sync = asgiref.sync.async_to_sync # The subject we will trigger on the `publish` message. publishes = rx.subjects.Subject() # This function is called by subscription to associate a # callback (which publishes notifications) with the groups. def register(groups, publish_callback, unsubscribed_callback): """Associate publish callback with the groups.""" # Put subscription information into the registry and # start listening to the subscription groups. trigger = publishes.on_next subinf = self._SubInf( groups=groups, op_id=op_id, trigger=trigger, unsubscribed=unsubscribed_callback, ) self._subscriptions[op_id] = subinf for group in groups: self._sids_by_group.setdefault(group, []).append(op_id) async_to_sync(self.channel_layer.group_add)( group, self.channel_name ) return publishes.map(publish_callback) # Create object-like context (like in `Query` or `Mutation`) # from the dict-like one provided by the Channels. The # subscriptions groups must be set by the subscription using # the `attach_to_groups` function. context = types.SimpleNamespace(**self.scope) context.register = register # Process GraphQL request with Graphene. Offload it to the # thread cause it may work slowly and do DB operations. db_sync_to_async = channels.db.database_sync_to_async result = await db_sync_to_async(self.schema.execute)( query, operation_name=operation_name, variable_values=variables, context_value=context, allow_subscriptions=True, ) except Exception as exc: # something is wrong - send ERROR message await self._send_gql_error(operation_id, traceback.format_exc()) else: # Receiving an observer means the subscription has been # processed. Otherwise it is just regular query or mutation. if isinstance(result, rx.Observable): # Client subscribed so subscribe to the observable # returned from GraphQL and respond with # the confirmation message. # Subscribe to the observable. # NOTE: Function `on_next` is called from a thread # where trigger runs, so it is necessary to wrap our # async method into `async_to_sync`, which carefully # runs given a coroutine in the current eventloop. # (See the implementation of `async_to_sync` # for details.) send_gql_data = async_to_sync(self._send_gql_data) result.subscribe(lambda r: send_gql_data(op_id, r.data, r.errors)) else: # Query or mutation received - send a response # immediately. # We respond with a message of type `data`. # If Graphene complains that the request is invalid # (e.g. query contains syntax error), then the `data` # argument is None and the 'errors' argument contains # all errors that occurred before or during execution. # `result` is instance of ExecutionResult # Respond with data. await self._send_gql_data(op_id, result.data, result.errors) # Tell the client that the request processing is over. await self._send_gql_complete(op_id) async def _on_gql_stop(self, operation_id): """Process the STOP message.""" # Currently only subscriptions can be stopped. # But we see but some clients (e.g. GraphiQL) send # the stop message even for queries and mutations. # We also see that the Apollo server ignores such messages, # so we ignore them as well. if not operation_id in self._subscriptions: return # Unsubscribe: stop listening corresponding groups and # subscription from the registry. waitlist = [] subinf = self._subscriptions.pop(operation_id) for group in subinf.groups: waitlist.append(self.channel_layer.group_discard(group, self.channel_name)) # Remove operation if from groups it belongs to. And remove # group from `_sids_by_group` if there is not subscriptions # in it. assert self._sids_by_group[group].count(operation_id) == 1, ( f"Registry is inconsistent: group `{group}` has " f"`{self._sids_by_group[group].count(operation_id)}` " "occurrences of operation_id=`{operation_id}`!" ) self._sids_by_group[group].remove(operation_id) if not self._sids_by_group[group]: del self._sids_by_group[group] await asyncio.wait(waitlist) # Call subscription class `unsubscribed` handler. subinf.unsubscribed() # Send the unsubscription confirmation message. await self._send_gql_complete(operation_id) # -------------------------------------------------------- GRAPHQL PROTOCOL MESSAGES async def _send_gql_connection_ack(self): """Sent in reply to the `connection_init` request.""" await self.send_json({"type": "connection_ack"}) async def _send_gql_connection_error(self, error): """Connection error sent in reply to the `connection_init`.""" await self.send_json({"type": "connection_error", "payload": error}) async def _send_gql_data(self, operation_id, data, errors): """Send GraphQL `data` message to the client. Args: data: Dict with GraphQL query responce. errors: List with exceptions occurred during processing the GraphQL query. (Errors happened in the resolvers.) """ error_to_dict = graphene_django.views.GraphQLView.format_error await self.send_json( { "type": "data", "id": operation_id, "payload": { "data": data, **( {"errors": [error_to_dict(e) for e in errors]} if errors else {} ), }, } ) async def _send_gql_error(self, operation_id, error): """Tell client there is a query processing error. Server sends this message upon a failing operation. It can be an unexpected or unexplained GraphQL execution error or a bug in the code. It is unlikely that this is GraphQL validation errors (such errors are part of data message and must be sent by the `_send_gql_data` method). Args: operation_id: Id of the operation that failed on the server. error: String with the information about the error. """ await self.send_json( {"type": "error", "id": operation_id, "payload": {"errors": [error]}} ) async def _send_gql_complete(self, operation_id): """Send GraphQL `complete` message to the client. Args: operation_id: If of the corresponding operation. """ await self.send_json({"type": "complete", "id": operation_id}) async def _send_gql_connection_keep_alive(self): """Send the keepalive (ping) message.""" await self.send_json({"type": "ka"}) # ------------------------------------------------------------------------ AUXILIARY async def _cleanup(self): """Cleanup before disconnect. Remove itself from the Channels groups, clear triggers and stop sending keepalive messages. """ # The list of awaitables to simultaneously wait at the end. waitlist = [] # Unsubscribe from the Channels groups. waitlist += [ self.channel_layer.group_discard(group, self.channel_name) for group in self._sids_by_group.keys() ] if self._keepalive_task is not None: # Stop sending keepalive messages. self._keepalive_task.cancel() waitlist += [self._keepalive_task] if waitlist: await asyncio.wait(waitlist) self._subscriptions.clear() self._sids_by_group.clear() PK!P@jww&channels_graphql_ws/test_graphql_ws.py# # coding: utf-8 # Copyright (c) 2018 DATADVANCE # # Permission is hereby granted, free of charge, to any person obtaining # a copy of this software and associated documentation files (the # "Software"), to deal in the Software without restriction, including # without limitation the rights to use, copy, modify, merge, publish, # distribute, sublicense, and/or sell copies of the Software, and to # permit persons to whom the Software is furnished to do so, subject to # the following conditions: # # The above copyright notice and this permission notice shall be # included in all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, # EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF # MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. # IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY # CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, # TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE # SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. """Tests GraphQL over WebSockets with subscriptions. Here we test `Subscription` and `GraphqlWsConsumer` classes. """ # NOTE: Tests use the GraphQL over WebSockets setup. All the necessary # items (schema, query, subscriptions, mutations, Channels # consumer & application) are defined at the end on this file. import json import textwrap import uuid import channels import channels.testing as ch_testing import django.urls import graphene import pytest from .graphql_ws import GraphqlWsConsumer, Subscription # Default timeout. Increased to avoid TimeoutErrors on slow machines. TIMEOUT = 5 @pytest.mark.asyncio async def test_main_usecase(): """Test main use-case with the GraphQL over WebSocket.""" # Channels communicator to test WebSocket consumers. comm = ch_testing.WebsocketCommunicator( application=my_app, path="graphql/", subprotocols=["graphql-ws"] ) print("Establish WebSocket connection and check a subprotocol.") connected, subprotocol = await comm.connect(timeout=TIMEOUT) assert connected, "Could not connect to the GraphQL subscriptions WebSocket!" assert subprotocol == "graphql-ws", "Wrong subprotocol received!" print("Initialize GraphQL connection.") await comm.send_json_to({"type": "connection_init", "payload": ""}) resp = await comm.receive_json_from(timeout=TIMEOUT) assert resp["type"] == "connection_ack" print("Make simple GraphQL query and check the response.") uniq_id = str(uuid.uuid4().hex) await comm.send_json_to( { "id": uniq_id, "type": "start", "payload": { "query": textwrap.dedent( """ query MyOperationName { value } """ ), "variables": {}, "operationName": "MyOperationName", }, } ) resp = await comm.receive_json_from(timeout=TIMEOUT) assert resp["id"] == uniq_id, "Response id != request id!" assert resp["type"] == "data", "Type `data` expected!" assert "errors" not in resp["payload"] assert resp["payload"]["data"]["value"] == MyQuery.VALUE resp = await comm.receive_json_from(timeout=TIMEOUT) assert resp["id"] == uniq_id, "Response id != request id!" assert resp["type"] == "complete", "Type `complete` expected!" print("Subscribe to GraphQL subscription.") subscription_id = str(uuid.uuid4().hex) await comm.send_json_to( { "id": subscription_id, "type": "start", "payload": { "query": textwrap.dedent( """ subscription MyOperationName { on_chat_message_sent(userId: ALICE) { event } } """ ), "variables": {}, "operationName": "MyOperationName", }, } ) print("Trigger the subscription by mutation to receive notification.") uniq_id = str(uuid.uuid4().hex) message = "Hi!" await comm.send_json_to( { "id": uniq_id, "type": "start", "payload": { "query": textwrap.dedent( """ mutation MyOperationName($message: String!) { send_chat_message(message: $message) { message } } """ ), "variables": {"message": message}, "operationName": "MyOperationName", }, } ) # Mutation response. resp = await comm.receive_json_from(timeout=TIMEOUT) assert resp["id"] == uniq_id, "Response id != request id!" assert resp["type"] == "data", "Type `data` expected!" assert "errors" not in resp["payload"] assert resp["payload"]["data"] == {"send_chat_message": {"message": message}} resp = await comm.receive_json_from(timeout=TIMEOUT) assert resp["id"] == uniq_id, "Response id != request id!" assert resp["type"] == "complete", "Type `complete` expected!" # Subscription notification. resp = await comm.receive_json_from(timeout=TIMEOUT) assert resp["id"] == subscription_id, "Notification id != subscription id!" assert resp["type"] == "data", "Type `data` expected!" assert "errors" not in resp["payload"] event = resp["payload"]["data"]["on_chat_message_sent"]["event"] assert json.loads(event) == { "userId": UserId.ALICE, "payload": message, }, "Subscription notification contains wrong data!" print("Disconnect and wait the application to finish gracefully.") await comm.disconnect(timeout=TIMEOUT) await comm.wait(timeout=TIMEOUT) @pytest.mark.asyncio async def test_error_cases(): """Test that server responds correctly when errors happen. Check that server responds with message of type `data` when there is a syntax error in the request or the exception in a resolver was raised. Check that server responds with message of type `error` when there was an exceptional situation, for example, field `query` of `payload` is missing or field `type` has a wrong value. """ # Channels communicator to test WebSocket consumers. comm = ch_testing.WebsocketCommunicator( application=my_app, path="graphql/", subprotocols=["graphql-ws"] ) print("Establish & initialize the connection.") await comm.connect(timeout=TIMEOUT) await comm.send_json_to({"type": "connection_init", "payload": ""}) resp = await comm.receive_json_from(timeout=TIMEOUT) assert resp["type"] == "connection_ack" print("Check that query syntax error leads to the `error` response.") uniq_id = str(uuid.uuid4().hex) await comm.send_json_to( { "id": uniq_id, "type": "wrong_type__(ツ)_/¯", "payload": {"variables": {}, "operationName": "MyOperationName"}, } ) resp = await comm.receive_json_from(timeout=TIMEOUT) assert resp["id"] == uniq_id, "Response id != request id!" assert resp["type"] == "error", "Type `error` expected!" assert len(resp["payload"]) == 1, "Single error expected!" assert isinstance( resp["payload"]["errors"][0], str ), "Error must be of string type!" print( "Check that query syntax error leads to the `data` response " "with `errors` array." ) uniq_id = str(uuid.uuid4().hex) await comm.send_json_to( { "id": uniq_id, "type": "start", "payload": { "query": textwrap.dedent( """ This list produces a syntax error! """ ), "variables": {}, "operationName": "MyOperationName", }, } ) resp = await comm.receive_json_from(timeout=TIMEOUT) assert resp["id"] == uniq_id, "Response id != request id!" assert resp["type"] == "data", "Type `data` expected!" payload = resp["payload"] assert payload["data"] is None errors = payload["errors"] assert len(errors) == 1, "Single error expected!" assert ( "message" in errors[0] and "locations" in errors[0] ), "Response missing mandatory fields!" assert errors[0]["locations"] == [{"line": 1, "column": 1}] resp = await comm.receive_json_from(timeout=TIMEOUT) assert resp["id"] == uniq_id, "Response id != request id!" assert resp["type"] == "complete", "Type `complete` expected!" print( "Check that query syntax error leads to the `data` response " "with `errors` array." ) uniq_id = str(uuid.uuid4().hex) await comm.send_json_to( { "id": uniq_id, "type": "start", "payload": { "query": textwrap.dedent( """ query MyOperationName { value(issue_error: true) } """ ), "variables": {}, "operationName": "MyOperationName", }, } ) resp = await comm.receive_json_from(timeout=TIMEOUT) assert resp["id"] == uniq_id, "Response id != request id!" assert resp["type"] == "data", "Type `data` expected!" payload = resp["payload"] assert payload["data"]["value"] is None errors = payload["errors"] assert len(errors) == 1, "Single error expected!" assert errors[0]["message"] == MyQuery.VALUE assert "locations" in errors[0] resp = await comm.receive_json_from(timeout=TIMEOUT) assert resp["id"] == uniq_id, "Response id != request id!" assert resp["type"] == "complete", "Type `complete` expected!" print("Check multiple errors in the `data` message.") uniq_id = str(uuid.uuid4().hex) await comm.send_json_to( { "id": uniq_id, "type": "start", "payload": { "query": textwrap.dedent( """ query { projects { path wrong_filed } } query a { projects } { wrong_name } """ ), "variables": {}, "operationName": "MyOperationName", }, } ) resp = await comm.receive_json_from(timeout=TIMEOUT) assert resp["id"] == uniq_id, "Response id != request id!" assert resp["type"] == "data", "Type `data` expected!" payload = resp["payload"] assert payload["data"] is None errors = payload["errors"] assert len(errors) == 5, "Five errors expected!" # Message is here: `This anonymous operation must be # the only defined operation`. assert errors[0]["message"] == errors[3]["message"] assert "locations" in errors[2], "The `locations` field expected" assert "locations" in errors[4], "The `locations` field expected" resp = await comm.receive_json_from(timeout=TIMEOUT) assert resp["id"] == uniq_id, "Response id != request id!" assert resp["type"] == "complete", "Type `complete` expected!" print("Disconnect and wait the application to finish gracefully.") await comm.disconnect(timeout=TIMEOUT) await comm.wait(timeout=TIMEOUT) @pytest.mark.asyncio async def test_connection_error(): """Test that server responds correctly when connection errors happen. Check that server responds with message of type `connection_error` when there was an exception in `on_connect` method. """ print("Prepare application.") class MyGraphqlWsConsumerConnectionError(GraphqlWsConsumer): """Channels WebSocket consumer which provides GraphQL API.""" schema = "" async def on_connect(self, payload): from graphql.error import GraphQLError # Always close the connection. raise GraphQLError("Reject connection") application = channels.routing.ProtocolTypeRouter( { "websocket": channels.routing.URLRouter( [ django.urls.path( "graphql-connection-error/", MyGraphqlWsConsumerConnectionError ) ] ) } ) # Channels communicator to test WebSocket consumers. comm = ch_testing.WebsocketCommunicator( application=application, path="graphql-connection-error/", subprotocols=["graphql-ws"], ) print("Try to initialize the connection.") await comm.connect(timeout=TIMEOUT) await comm.send_json_to({"type": "connection_init", "payload": ""}) resp = await comm.receive_json_from(timeout=TIMEOUT) assert resp["type"] == "connection_error" assert resp["payload"]["message"] == "Reject connection" resp = await comm.receive_output(timeout=TIMEOUT) assert resp["type"] == "websocket.close" assert resp["code"] == 4000 print("Disconnect and wait the application to finish gracefully.") await comm.disconnect(timeout=TIMEOUT) await comm.wait(timeout=TIMEOUT) @pytest.mark.asyncio async def test_subscribe_unsubscribe(): """Test subscribe-unsubscribe behavior with the GraphQL over WebSocket. 0. Subscribe to GraphQL subscription: messages for Alice. 1. Send STOP message and unsubscribe. 2. Subscribe to GraphQL subscription: messages for Tom. 3. Call unsubscribe method of the Subscription instance (via `kick_out_user` mutation). 4. Execute some mutation. 5. Check subscription notifications: there are no notifications. """ # Channels communicator to test WebSocket consumers. comm = ch_testing.WebsocketCommunicator( application=my_app, path="graphql/", subprotocols=["graphql-ws"] ) print("Establish and initialize WebSocket GraphQL connection.") await comm.connect(timeout=TIMEOUT) await comm.send_json_to({"type": "connection_init", "payload": ""}) await comm.receive_json_from(timeout=TIMEOUT) print("Subscribe to GraphQL subscription.") sub_id = str(uuid.uuid4().hex) await comm.send_json_to( { "id": sub_id, "type": "start", "payload": { "query": textwrap.dedent( """ subscription MyOperationName { on_chat_message_sent(userId: ALICE) { event } } """ ), "variables": {}, "operationName": "MyOperationName", }, } ) print("Stop subscription by id.") await comm.send_json_to({"id": sub_id, "type": "stop"}) # Subscription notification with unsubscribe information. resp = await comm.receive_json_from(timeout=TIMEOUT) assert resp["id"] == sub_id, "Response id != request id!" assert resp["type"] == "complete", "Type `complete` expected!" print("Subscribe to GraphQL subscription.") sub_id = str(uuid.uuid4().hex) await comm.send_json_to( { "id": sub_id, "type": "start", "payload": { "query": textwrap.dedent( """ subscription MyOperationName { on_chat_message_sent(userId: TOM) { event } } """ ), "variables": {}, "operationName": "MyOperationName", }, } ) print("Stop all subscriptions for TOM.") uniq_id = str(uuid.uuid4().hex) await comm.send_json_to( { "id": uniq_id, "type": "start", "payload": { "query": textwrap.dedent( """ mutation MyOperationName { kick_out_user(userId: TOM) { success } } """ ), "variables": {}, "operationName": "MyOperationName", }, } ) # Mutation response. resp = await comm.receive_json_from(timeout=TIMEOUT) assert resp["id"] == uniq_id, "Response id != request id!" assert resp["type"] == "data", "Type `data` expected!" resp = await comm.receive_json_from(timeout=TIMEOUT) assert resp["id"] == uniq_id, "Response id != request id!" assert resp["type"] == "complete", "Type `complete` expected!" # Subscription notification with unsubscribe information. resp = await comm.receive_json_from(timeout=TIMEOUT) assert resp["id"] == sub_id, "Response id != request id!" assert resp["type"] == "complete", "Type `complete` expected!" print("Trigger the subscription by mutation to receive notification.") uniq_id = str(uuid.uuid4().hex) message = "Is anybody here?" await comm.send_json_to( { "id": uniq_id, "type": "start", "payload": { "query": textwrap.dedent( """ mutation MyOperationName($message: String!) { send_chat_message(message: $message) { message } } """ ), "variables": {"message": message}, "operationName": "MyOperationName", }, } ) # Mutation response. resp = await comm.receive_json_from(timeout=TIMEOUT) assert resp["id"] == uniq_id, "Response id != request id!" assert resp["type"] == "data", "Type `data` expected!" resp = await comm.receive_json_from(timeout=TIMEOUT) assert resp["id"] == uniq_id, "Response id != request id!" assert resp["type"] == "complete", "Type `complete` expected!" # Check notifications: there are no notifications! Previously, # there was an unsubscription from all subscriptions. assert await comm.receive_nothing() is True, "No notifications expected!" print("Disconnect and wait the application to finish gracefully.") await comm.disconnect(timeout=TIMEOUT) await comm.wait(timeout=TIMEOUT) @pytest.mark.asyncio async def test_groups(): """Test notifications and subscriptions behavior depending on the different subscription groups. 0. Subscribe to the group1: messages for Alice. 1. Subscribe to the group2: messages for Tom. 2. Trigger group1 (send message to Alice) and check subscribed recipients: Alice. 3. Trigger group2 (send message to Tom) and check subscribed recipients: Tom. 4. Trigger all groups (send messages for all users) and check subscribed recipients: Alice, Tom. """ async def create_and_subscribe(userId): """Establish and initialize WebSocket GraphQL connection. Subscribe to GraphQL subscription by userId. Args: userId: User ID for `on_chat_message_sent` subscription. Returns: sub_id: Subscription uid. comm: Client, instance of the `WebsocketCommunicator`. """ comm = ch_testing.WebsocketCommunicator( application=my_app, path="graphql/", subprotocols=["graphql-ws"] ) await comm.connect(timeout=TIMEOUT) await comm.send_json_to({"type": "connection_init", "payload": ""}) await comm.receive_json_from(timeout=TIMEOUT) sub_id = str(uuid.uuid4().hex) await comm.send_json_to( { "id": sub_id, "type": "start", "payload": { "query": textwrap.dedent( """ subscription MyOperationName($userId: UserId) { on_chat_message_sent(userId: $userId) { event } } """ ), "variables": {"userId": userId}, "operationName": "MyOperationName", }, } ) return sub_id, comm async def trigger_subscription(comm, userId, message): """Send a message to user using `send_chat_message` mutation. Args: comm: Client, instance of WebsocketCommunicator. userId: User ID for `send_chat_message` mutation. message: Any string message. """ uniq_id = str(uuid.uuid4().hex) await comm.send_json_to( { "id": uniq_id, "type": "start", "payload": { "query": textwrap.dedent( """ mutation MyOperationName($message: String!, $userId: UserId) { send_chat_message(message: $message, userId: $userId) { message } } """ ), "variables": {"message": message, "userId": userId}, "operationName": "MyOperationName", }, } ) # Mutation response. resp = await comm.receive_json_from(timeout=TIMEOUT) assert resp["id"] == uniq_id, "Response id != request id!" resp = await comm.receive_json_from(timeout=TIMEOUT) assert resp["id"] == uniq_id, "Response id != request id!" def check_resp(resp, uid, user_id, message): """Check the responce from `on_chat_message_sent` subscription. Args: uid: Expected value of field `id` of responce. userId: Expected user ID. message: Expected message string. """ assert resp["id"] == uid, "Notification id != subscription id!" assert resp["type"] == "data", "Type `data` expected!" assert "errors" not in resp["payload"] event = resp["payload"]["data"]["on_chat_message_sent"]["event"] assert json.loads(event) == { "userId": user_id, "payload": message, }, "Subscription notification contains wrong data!" async def disconnect(comm): """Disconnect and wait the application to finish gracefully.""" await comm.disconnect(timeout=TIMEOUT) await comm.wait(timeout=TIMEOUT) print("Initialize the connection, create subscriptions.") alice_id = "ALICE" tom_id = "TOM" # Subscribe to messages for TOM. uid_tom, comm_tom = await create_and_subscribe(tom_id) # Subscribe to messages for Alice. uid_alice, comm_alice = await create_and_subscribe(alice_id) print("Trigger subscription: send message to Tom.") message = "Hi, Tom!" # Note: Strictly speaking, we do not have confidence that `comm_tom` # had enough time to subscribe. So Tom may not be able to receive # a message from Alice. But in this simple test, we performed the # Tom's subscription before the Alice's subscription and # that should be enough. await trigger_subscription(comm_alice, tom_id, message) # Check Tom's notifications. resp = await comm_tom.receive_json_from(timeout=TIMEOUT) check_resp(resp, uid_tom, UserId[tom_id].value, message) # Any other did not receive any notifications. assert await comm_alice.receive_nothing() is True, "No notifications expected!" print("Trigger subscription: send message to Alice.") message = "Hi, Alice!" await trigger_subscription(comm_tom, alice_id, message) # Check Tom's notifications. resp = await comm_alice.receive_json_from(timeout=TIMEOUT) check_resp(resp, uid_alice, UserId[alice_id].value, message) # Any other did not receive any notifications. assert await comm_tom.receive_nothing() is True, "No notifications expected!" print("Trigger subscription: send message to all groups.") message = "test... ping..." await trigger_subscription(comm_tom, None, message) # Check Tom's and Alice's notifications. resp = await comm_tom.receive_json_from(timeout=TIMEOUT) check_resp(resp, uid_tom, UserId[tom_id].value, message) resp = await comm_alice.receive_json_from(timeout=TIMEOUT) check_resp(resp, uid_alice, UserId[alice_id].value, message) # Disconnect. await disconnect(comm_tom) await disconnect(comm_alice) @pytest.mark.asyncio async def test_keepalive(): """Test that server sends keepalive messages.""" print("Prepare application.") class MyGraphqlWsConsumerKeepalive(GraphqlWsConsumer): """Channels WebSocket consumer which provides GraphQL API.""" schema = "" # Period to send keepalive mesages. Just some reasonable number. send_keepalive_every = 0.05 application = channels.routing.ProtocolTypeRouter( { "websocket": channels.routing.URLRouter( [django.urls.path("graphql-keepalive/", MyGraphqlWsConsumerKeepalive)] ) } ) # Channels communicator to test WebSocket consumers. comm = ch_testing.WebsocketCommunicator( application=application, path="graphql-keepalive/", subprotocols=["graphql-ws"] ) print("Establish & initialize the connection.") await comm.connect(timeout=TIMEOUT) await comm.send_json_to({"type": "connection_init", "payload": ""}) resp = await comm.receive_json_from(timeout=TIMEOUT) assert resp["type"] == "connection_ack" resp = await comm.receive_json_from(timeout=TIMEOUT) assert ( resp["type"] == "ka" ), "Keepalive message expected right after `connection_ack`!" print("Receive several keepalive messages.") pings = [] for _ in range(3): pings.append(await comm.receive_json_from(timeout=TIMEOUT)) assert all([ping["type"] == "ka" for ping in pings]) print("Send connection termination message.") await comm.send_json_to({"type": "connection_terminate"}) print("Disconnect and wait the application to finish gracefully.") await comm.disconnect(timeout=TIMEOUT) await comm.wait(timeout=TIMEOUT) # --------------------------------------------------------- GRAPHQL OVER WEBSOCKET SETUP class UserId(graphene.Enum): """User IDs for sending messages.""" TOM = 0 ALICE = 1 class OnChatMessageSent(Subscription): """Test GraphQL subscription. Subscribe to receive messages by user ID. """ event = graphene.JSONString() class Arguments: """That is how subscription arguments are defined.""" userId = UserId() def subscribe( self, info, userId ): # pylint: disable=unused-argument,arguments-differ """Specify subscription groups when client subscribes.""" assert self is None, "Root `self` expected to be `None`!" # Subscribe to the group corresponding to the user. if not userId is None: return [f"user_{userId}"] # Subscribe to default group. return [] def publish(self, info, userId): # pylint: disable=unused-argument,arguments-differ """Publish query result to the subscribers.""" event = {"userId": userId, "payload": self} return OnChatMessageSent(event=event) @classmethod def notify(cls, userId, message): # pylint: disable=arguments-differ """Example of the `notify` classmethod usage.""" # Find the subscription group for user. group = None if userId is None else f"user_{userId}" super().broadcast(group=group, payload=message) class SendChatMessage(graphene.Mutation): """Test GraphQL mutation. Send message to the user or all users. """ class Output(graphene.ObjectType): """Mutation result.""" message = graphene.String() userId = UserId() class Arguments: """That is how mutation arguments are defined.""" message = graphene.String(required=True) userId = graphene.Argument(UserId, required=False) def mutate(self, info, message, userId=None): # pylint: disable=unused-argument """Send message to the user or all users.""" assert self is None, "Root `self` expected to be `None`!" # Notify subscribers. OnChatMessageSent.notify(message=message, userId=userId) return SendChatMessage.Output(message=message, userId=userId) class KickOutUser(graphene.Mutation): """Test GraphQL mutation. Stop all subscriptions associated with the user. """ class Arguments: """That is how mutation arguments are defined.""" userId = UserId() success = graphene.Boolean() def mutate(self, info, userId): # pylint: disable=unused-argument """Unsubscribe everyone associated with the userId.""" assert self is None, "Root `self` expected to be `None`!" OnChatMessageSent.unsubscribe(group=f"user_{userId}") return KickOutUser(success=True) class MySubscription(graphene.ObjectType): """GraphQL subscriptions.""" on_chat_message_sent = OnChatMessageSent.Field() class MyMutation(graphene.ObjectType): """GraphQL mutations.""" send_chat_message = SendChatMessage.Field() kick_out_user = KickOutUser.Field() class MyQuery(graphene.ObjectType): """Root GraphQL query.""" VALUE = str(uuid.uuid4().hex) value = graphene.String(args={"issue_error": graphene.Boolean(default_value=False)}) def resolve_value(self, info, issue_error): # pylint: disable=unused-argument """Resolver to return predefined value which can be tested.""" assert self is None, "Root `self` expected to be `None`!" if issue_error: raise RuntimeError(MyQuery.VALUE) return MyQuery.VALUE my_schema = graphene.Schema( query=MyQuery, subscription=MySubscription, mutation=MyMutation, auto_camelcase=False, ) class MyGraphqlWsConsumer(GraphqlWsConsumer): """Channels WebSocket consumer which provides GraphQL API.""" schema = my_schema my_app = channels.routing.ProtocolTypeRouter( { "websocket": channels.routing.URLRouter( [django.urls.path("graphql/", MyGraphqlWsConsumer)] ) } ) PK!Su++2django_channels_graphql_ws-0.1.2.dist-info/LICENSEMIT License Copyright (c) 2018 DATADVANCE Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. PK!H_zTT0django_channels_graphql_ws-0.1.2.dist-info/WHEEL A н#Z;/"d&F[xzw@Zpy3Fv]n0H*J>mlcAPK!HiZjp3django_channels_graphql_ws-0.1.2.dist-info/METADATA_O0)& 04@$ (]֎Ϸ1=7wND Zqh$2} k?LRZ?6"O7k{"˄9r$0:K`%,F7`RxEC{  beC#sG3sSĹf3&MbΚeh'TʸTI?7!*K.ц0ŃP2XW;}̄L9byM<\ Bⶐ81wX XRa\K$sSuGhuȳLfP[ygK豴icfoNaoހ j]m:R/P *ڿ(cIXk_̡*ߟ_PK!H䚼Lf1django_channels_graphql_ws-0.1.2.dist-info/RECORD͹0g a(MNY "ssliְ:th(-xQ:͙j͓P˯F 5Vg9y>D9(K(~Η,mkw ˫V~sYNaWec CIVޮSѷgv\nNPp9S4Y]ľ ExV7 X'E O/{iȾ:(;̵1[~~7GT0MgyP@)?ΔA уNjM"rnrmr fm1{