PK!Lfgraphql_ws/__init__.pyfrom graphql_ws.abc import AbstractConnectionContext from graphql_ws.protocol import WS_INTERNAL_ERROR, WS_PROTOCOL, GQLMsgType from graphql_ws.server import SubscriptionServer __version__ = "1.0.0" PK![)graphql_ws/abc.pyimport asyncio import abc import collections.abc import typing class AbstractConnectionContext(collections.abc.MutableMapping): ws: typing.Any context_value: typing.Any tasks = typing.Set[asyncio.Task] _operations: typing.Dict[str, typing.AsyncIterator] def __init__(self, ws, context_value=None): self.ws = ws self.context_value = context_value self._operations = {} self.tasks = set() def __getitem__(self, key: str) -> typing.AsyncIterator: return self._operations[key] def __setitem__(self, key: str, value: typing.AsyncIterator) -> None: self._operations[key] = value def __delitem__(self, key: str) -> None: del self._operations[key] def __iter__(self) -> typing.Iterator[str]: return iter(self._operations) def __len__(self) -> int: return len(self._operations) @property @abc.abstractmethod def closed(self) -> bool: pass @abc.abstractmethod async def close(self, code: int) -> None: pass @abc.abstractmethod def receive(self) -> str: pass @abc.abstractmethod async def send(self, data: str) -> None: pass PK!tvMjjgraphql_ws/aiohttp.pyfrom aiohttp import WSMsgType, web from graphql_ws.abc import AbstractConnectionContext from graphql_ws.server import ConnectionClosed class AiohttpConnectionContext(AbstractConnectionContext): ws: web.WebSocketResponse # pylint: disable=C0103, invalid-name async def receive(self) -> str: message = await self.ws.receive() if message.type == WSMsgType.TEXT: return message.data if message.type in [ WSMsgType.CLOSED, WSMsgType.CLOSING, WSMsgType.ERROR, ]: raise ConnectionClosed @property def closed(self) -> bool: return self.ws.closed async def close(self, code: int) -> None: await self.ws.close(code=code) async def send(self, data: str) -> None: if self.closed: return await self.ws.send_str(data) PK!~\Egraphql_ws/protocol.pyimport collections.abc import enum import json import typing import graphql WS_INTERNAL_ERROR = 1011 WS_PROTOCOL = "graphql-ws" class GQLMsgType(enum.Enum): CONNECTION_INIT = "connection_init" # Client -> Server CONNECTION_ACK = "connection_ack" # Server -> Client CONNECTION_ERROR = "connection_error" # Server -> Client # NOTE: The keep alive message type does not follow the standard due to # connection optimizations CONNECTION_KEEP_ALIVE = "ka" # Server -> Client CONNECTION_TERMINATE = "connection_terminate" # Client -> Server START = "start" # Client -> Server DATA = "data" # Server -> Client ERROR = "error" # Server -> Client COMPLETE = "complete" # Server -> Client STOP = "stop" # Client -> Server class OperationMessagePayload(collections.abc.Mapping): __slots__ = ("_payload",) def __init__(self, payload: typing.Dict[str, typing.Any]): if payload is not None and not isinstance(payload, dict): raise TypeError("Payload must be an object") self._payload = payload or {} def __getitem__(self, key: str) -> typing.Any: return self._payload[key] def __iter__(self) -> typing.Iterator[str]: return iter(self._payload) def __len__(self) -> int: return len(self._payload) @property def query(self): return self.get("query") @property def variable_values(self): return self.get("variableValues") @property def operation_name(self): return self.get("operationName") @property def document(self) -> typing.Optional[graphql.DocumentNode]: try: return graphql.parse(self.query) except Exception: # pylint: disable=W0703, broad-except return None @property def source(self) -> graphql.Source: return graphql.Source(self.query) @property def has_subscription_operation(self) -> bool: document = self.document if not document: return False return any( [ definition.operation is graphql.OperationType.SUBSCRIPTION for definition in self.document.definitions ] ) class OperationMessage: __slots__ = ("_type", "_id", "_payload") _type: GQLMsgType _id: typing.Optional[str] _payload: typing.Optional[OperationMessagePayload] def __init__(self, type, id=None, payload=None): # pylint: disable=W0622, redefined-builtin self._type = GQLMsgType(type) self._id = id self._payload = OperationMessagePayload(payload) def __eq__(self, other: typing.Any) -> bool: if not isinstance(other, type(self)): return False return all( [ self.type == other.type, self.id == other.id, self.payload == other.payload, ] ) @property def id(self) -> str: return self._id @property def type(self) -> GQLMsgType: return self._type @property def payload(self) -> OperationMessagePayload: return self._payload @classmethod def load(cls, data: typing.Dict[str, typing.Any]) -> "OperationMessage": if not isinstance(data, dict): raise TypeError("Message must be an object") return cls( type=data.get("type"), id=data.get("id"), payload=data.get("payload"), ) @classmethod def loads(cls, data: str) -> "OperationMessage": return cls.load(json.loads(data)) PK!B graphql_ws/server.pyimport asyncio import json import typing import graphql from graphql_ws.abc import AbstractConnectionContext from graphql_ws.protocol import ( WS_INTERNAL_ERROR, GQLMsgType, OperationMessage, OperationMessagePayload, ) async def close_cancelling(agen): while True: try: task = asyncio.ensure_future(agen.__anext__()) await task yield task.result() except (GeneratorExit, StopAsyncIteration): await agen.aclose() task.cancel() break class ConnectionClosed(Exception): pass class SubscriptionServer: schema: graphql.GraphQLSchema connection_context_cls: AbstractConnectionContext def __init__(self, schema, connection_context_cls): self.schema = schema self.connection_context_cls = connection_context_cls async def handle(self, ws: typing.Any, context_value: typing.Any) -> None: connection_context = self.connection_context_cls(ws, context_value) await asyncio.shield(self._handle(connection_context)) async def _handle(self, connection_context: AbstractConnectionContext): await self.on_open(connection_context) while True: try: message = await connection_context.receive() except ConnectionClosed: break else: connection_context.tasks.add( asyncio.ensure_future( self.on_message(connection_context, message) ) ) finally: connection_context.tasks = { task for task in connection_context.tasks if not task.done() } await self.on_close(connection_context) for task in connection_context.tasks: task.cancel() async def send_message( self, connection_context: AbstractConnectionContext, op_id: typing.Optional[str], type: GQLMsgType, payload: typing.Any, ) -> None: # pylint: disable=W0622, redefined-builtin message = {"type": type.value} if op_id is not None: message["id"] = op_id if payload is not None: message["payload"] = payload data = json.dumps(message) await connection_context.send(data) async def send_error( self, connection_context: AbstractConnectionContext, op_id: typing.Optional[str], error: Exception, error_type: typing.Optional[GQLMsgType] = None, ) -> None: if error_type is None: error_type = GQLMsgType.ERROR assert error_type in [GQLMsgType.CONNECTION_ERROR, GQLMsgType.ERROR], ( "error_type should be one of the allowed error messages " "GQLMessageType.CONNECTION_ERROR or GQLMsgType.ERROR" ) error_payload = {"message": str(error)} await self.send_message( connection_context, op_id, error_type, error_payload ) async def send_execution_result( self, connection_context: AbstractConnectionContext, op_id: str, execution_result: graphql.ExecutionResult, ) -> None: result = {} if execution_result.data: result["data"] = execution_result.data if execution_result.errors: result["errors"] = [ graphql.format_error(error) for error in execution_result.errors ] return await self.send_message( connection_context, op_id, GQLMsgType.DATA, result ) async def unsubscribe( self, connection_context: AbstractConnectionContext, op_id: str ) -> None: operation = connection_context.get(op_id) if operation: await operation.aclose() await self.on_operation_complete(connection_context, op_id) # ON methods async def on_close( self, connection_context: AbstractConnectionContext ) -> None: if not connection_context: return await asyncio.wait( [ self.unsubscribe(connection_context, op_id) for op_id in connection_context ] ) async def on_connect( self, connection_context: AbstractConnectionContext, payload: typing.Dict[str, typing.Any], ) -> None: pass async def on_connection_init( self, connection_context: AbstractConnectionContext, op_id: str, payload: typing.Dict[str, typing.Any], ) -> None: try: await self.on_connect(connection_context, payload) await self.send_message( connection_context, None, GQLMsgType.CONNECTION_ACK, None ) except Exception as exc: # pylint: disable=W0703, broad-except await self.send_error( connection_context, op_id, exc, GQLMsgType.CONNECTION_ERROR ) await connection_context.close(WS_INTERNAL_ERROR) async def on_connection_terminate( self, connection_context: AbstractConnectionContext ) -> None: await connection_context.close(WS_INTERNAL_ERROR) async def on_message( self, connection_context: AbstractConnectionContext, message: str ) -> None: try: loaded = OperationMessage.loads(message) except Exception as e: # pylint: disable=W0703, broad-except await self.send_error(connection_context, None, e) return if loaded.type is GQLMsgType.CONNECTION_INIT: await self.on_connection_init( connection_context, loaded.id, loaded.payload ) elif loaded.type is GQLMsgType.CONNECTION_TERMINATE: await self.on_connection_terminate(connection_context) elif loaded.type is GQLMsgType.START: await self.on_start(connection_context, loaded.id, loaded.payload) elif loaded.type is GQLMsgType.STOP: await self.on_stop(connection_context, loaded.id) async def on_open( self, connection_context: AbstractConnectionContext ) -> None: pass async def on_operation_complete( self, connection_context: AbstractConnectionContext, op_id: str ) -> None: pass async def on_start( self, connection_context: AbstractConnectionContext, op_id: str, payload: OperationMessagePayload, ) -> None: """ We shield the graphql executions as cancelling semi-complete executions can lead to inconsistent behavior (for example partial transactions) """ # If we already have a sub with this id, unsubscribe from it first if op_id in connection_context: await self.unsubscribe(connection_context, op_id) if payload.has_subscription_operation: result = await graphql.subscribe( self.schema, document=payload.document, context_value=connection_context.context_value, variable_values=payload.variable_values, operation_name=payload.operation_name, ) else: result = await graphql.graphql( self.schema, source=payload.source, context_value=connection_context.context_value, variable_values=payload.variable_values, operation_name=payload.operation_name, ) if not isinstance(result, typing.AsyncIterator): await self.send_execution_result(connection_context, op_id, result) return # agen = connection_context[op_id] = close_cancelling(result) connection_context[op_id] = result try: async for val in result: # pylint: disable=E1133, not-an-iterable await self.send_execution_result(connection_context, op_id, val) finally: if connection_context.get(op_id) == result: del connection_context[op_id] await self.send_message( connection_context, op_id, GQLMsgType.COMPLETE, None ) async def on_stop( self, connection_context: AbstractConnectionContext, op_id: str ) -> None: await self.unsubscribe(connection_context, op_id) PK!ݖU  graphql_ws/testing.pyimport asyncio import json import typing from graphql_ws.abc import AbstractConnectionContext from graphql_ws.server import ConnectionClosed CLOSED = "__CLOSED__" class TestWebsocket: transport: "TestWebsocketTransport" local = asyncio.Queue() remote = asyncio.Queue() def __init__(self, transport, local, remote): self.transport = transport self.local = local self.remote = remote async def receive(self) -> str: return await self.local.get() async def receive_json(self) -> typing.Dict[str, typing.Any]: return json.loads(await self.receive()) async def send(self, message: str) -> None: await self.remote.put(message) async def send_json(self, message: typing.Any) -> None: await self.send(json.dumps(message)) @property def closed(self) -> bool: return self.transport.closed async def close(self, code: int) -> None: await self.transport.close(code) class TestWebsocketTransport: client: TestWebsocket server: TestWebsocket _close_event: asyncio.Event close_code: typing.Optional[int] = None def __init__(self): queue1, queue2 = asyncio.Queue(), asyncio.Queue() self.client = TestWebsocket(self, queue1, queue2) self.server = TestWebsocket(self, queue2, queue1) self._close_event = asyncio.Event() @property def closed(self) -> bool: return self._close_event.is_set() async def close(self, code: int) -> None: self._close_event.set() await self.client.local.put(CLOSED) await self.server.local.put(CLOSED) self.close_code = code class TestConnectionContext(AbstractConnectionContext): ws: TestWebsocket def __init__(self, ws, context_value): super().__init__(ws, context_value) self.ws = ws async def receive(self) -> str: message = await self.ws.receive() if message == CLOSED: raise ConnectionClosed() return message @property def closed(self) -> bool: return self.ws.closed async def close(self, code: int) -> None: await self.ws.close(code) async def send(self, data: str) -> None: if self.closed: return await self.ws.send(data) PK!כD88'graphql_ws_next-1.0.0.dist-info/LICENSEThe MIT License (MIT) Copyright (c) 2018 – Devin Fee Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.PK!HW"TT%graphql_ws_next-1.0.0.dist-info/WHEEL A н#J."jm)Afb~ ڡ5 G7hiޅF4+-3ڦ/̖?XPK!H8*."(graphql_ws_next-1.0.0.dist-info/METADATAWo8 ~_!tMp_A[\vźk0؊4Zl˕NݚđHb/h#U1o7J0Zl4,kDބl\ЫQ,7Ti^rSMLeiQӰTR< ΪI|Nۧ^ ߮>__ϰZ*ƿsDV`v˰ΔsXȂW xJot|aXI fpBI ]!8ze9&/ E )?R E|΁LWG$,Z*rqh,(*ĎWkPj!S'Clѹ4vąTS#;2&JK/I#$}M"gUɷ(;|hc;8Mc 3mE$$ZnDf cZ x^:teDn& OT ' p΀'t r|rp5q0G{(U'q2>DB cnOV\Ɗ,#aɜSZ)e9b, ]t#nfeqZukioc* b(29(z7rJSbtBs>y#"Mw]bhqM=lb!ϖ[T>t &/!֩rJ4nuUZ ׷>PzݵHYU]5C[ ;L>~ ळJTcK{XJTf{eo{~?BL]ۈ<K ЫE:6ۨDq*wҐV |R(C=̰51K-#+ -^-jU ܢne86 ]her; Z= 6h0w ɱYeS,zuNpR8d w gSt['p赖vX"u(*pΊ"~|G"E>篨Fb,|fߢ gg1<,6~ijn5KiJ5΢j3m4fK{SBl:IcyG. [ܮq$<SݡF "]QˮL]3. 86kj8v96RZck.9_a"6!D`mm堭 f bR׹.gyԜ9xR,UG~N8%ViIPfG!;n.iXzjx8wf`g~ KCHpޥ~jkҁ۴T4.n~CVD(H"" -RIi1KJRJd |R%S;dwz J G.*-_3^G[s)s,z ZRIe TTBi~J= MJ -$Û6d1tz.Y23Ue)` ,g r67zt^Q}4azC PK!Hq&graphql_ws_next-1.0.0.dist-info/RECORDл@|vl PY4By _MFV秫 I/5/ʦ}v#oI6Q8PyWcHN?a G_K 06Oj syHKŽ3Z0?RbJbcekUQ%Me EXGZڞzeKֶos!V6c{IyޞyL3rMX57ЂxQK}eU]X(O1l-Xl5NKkҏ av7z?DCMv, I˞e_R7|QQfFMHCf㓼eu lt_zezr U$6cW-*_=jBoE QgKkI%jse"{fۂ[a>PK!Lfgraphql_ws/__init__.pyPK![)graphql_ws/abc.pyPK!tvMjjgraphql_ws/aiohttp.pyPK!~\E~ graphql_ws/protocol.pyPK!B graphql_ws/server.pyPK!ݖU  8graphql_ws/testing.pyPK!כD88',Bgraphql_ws_next-1.0.0.dist-info/LICENSEPK!HW"TT%Fgraphql_ws_next-1.0.0.dist-info/WHEELPK!H8*."(@Ggraphql_ws_next-1.0.0.dist-info/METADATAPK!Hq&Mgraphql_ws_next-1.0.0.dist-info/RECORDPK O