PK!!strawberry/__init__.py__version__ = "0.1.0" from .field import field # noqa from .mutation import mutation, subscription # noqa from .schema import Schema # noqa from .type import type, input # noqa from .scalars import ID # noqa PK!]~~strawberry/cli/__init__.pyimport click import sys import os from starlette.applications import Starlette from starlette.middleware.cors import CORSMiddleware import importlib import uvicorn import hupper from strawberry.contrib.starlette import GraphQLApp, GraphQLSubscriptionApp @click.group() def run(): pass @run.command("server") @click.argument("module", type=str) @click.option("-h", "--host", default="0.0.0.0", type=str) @click.option("-p", "--port", default=8000, type=int) def server(module, host, port): sys.path.append(os.getcwd()) reloader = hupper.start_reloader("strawberry.cli.run", verbose=False) schema_module = importlib.import_module(module) reloader.watch_files([schema_module.__file__]) app = Starlette(debug=True) app.add_middleware( CORSMiddleware, allow_headers=["*"], allow_origins=["*"], allow_methods=["*"] ) app.add_route("/graphql", GraphQLApp(schema_module.schema)) app.add_websocket_route("/graphql", GraphQLSubscriptionApp(schema_module.schema)) print(f"Running strawberry on http://{host}:{port}/graphql 🍓") uvicorn.run(app, host=host, port=port, log_level="error") PK!)QZZstrawberry/constants.pyIS_STRAWBERRY_FIELD = "_is_strawberry_field" IS_STRAWBERRY_INPUT = "_is_strawberry_input" PK!strawberry/contrib/__init__.pyPK!8oo(strawberry/contrib/starlette/__init__.pyfrom .app.graphql_app import GraphQLApp # noqa from .app.graphql_ws_app import GraphQLSubscriptionApp # noqa PK!,strawberry/contrib/starlette/app/__init__.pyPK! GG(strawberry/contrib/starlette/app/base.pyimport datetime import json import typing from pygments import highlight, lexers from pygments.formatters import Terminal256Formatter from ..utils.graphql_lexer import GraphqlLexer class BaseApp: def __init__(self, schema) -> None: self.schema = schema def _debug_log( self, operation_name: str, query: str, variables: typing.Dict["str", typing.Any] ): if operation_name == "IntrospectionQuery": return now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") print(f"[{now}]: {operation_name or 'No operation name'}") print(highlight(query, GraphqlLexer(), Terminal256Formatter())) if variables: variables_json = json.dumps(variables, indent=4) print(highlight(variables_json, lexers.JsonLexer(), Terminal256Formatter())) PK!Q/strawberry/contrib/starlette/app/graphql_app.pyimport functools from graphql import graphql from graphql.error import format_error as format_graphql_error from starlette import status from starlette.background import BackgroundTasks from starlette.requests import Request from starlette.responses import HTMLResponse, JSONResponse, PlainTextResponse, Response from starlette.types import ASGIInstance, Receive, Scope, Send from .base import BaseApp from .utils import get_playground_template class GraphQLApp(BaseApp): def __init__(self, schema, playground: bool = True) -> None: self.schema = schema self.playground = playground def __call__(self, scope: Scope) -> ASGIInstance: return functools.partial(self.asgi, scope=scope) async def asgi(self, receive: Receive, send: Send, scope: Scope) -> None: request = Request(scope, receive=receive) response = await self.handle_graphql(request) await response(receive, send) async def handle_graphql(self, request: Request) -> Response: if request.method in ("GET", "HEAD"): if "text/html" in request.headers.get("Accept", ""): if not self.playground: return PlainTextResponse( "Not Found", status_code=status.HTTP_404_NOT_FOUND ) return await self.handle_playground(request) elif request.method == "POST": content_type = request.headers.get("Content-Type", "") if "application/json" in content_type: data = await request.json() elif "application/graphql" in content_type: body = await request.body() text = body.decode() data = {"query": text} elif "query" in request.query_params: data = request.query_params else: return PlainTextResponse( "Unsupported Media Type", status_code=status.HTTP_415_UNSUPPORTED_MEDIA_TYPE, ) else: return PlainTextResponse( "Method Not Allowed", status_code=status.HTTP_405_METHOD_NOT_ALLOWED ) try: query = data["query"] variables = data.get("variables") operation_name = data.get("operationName") except KeyError: return PlainTextResponse( "No GraphQL query found in the request", status_code=status.HTTP_400_BAD_REQUEST, ) self._debug_log(operation_name, query, variables) background = BackgroundTasks() context = {"request": request, "background": background} result = await self.execute( query, variables=variables, context=context, operation_name=operation_name ) error_data = ( [format_graphql_error(err) for err in result.errors] if result.errors else None ) response_data = {"data": result.data, "errors": error_data} status_code = ( status.HTTP_400_BAD_REQUEST if result.errors else status.HTTP_200_OK ) return JSONResponse( response_data, status_code=status_code, background=background ) async def handle_playground(self, request: Request) -> Response: text = get_playground_template(str(request.url)) return HTMLResponse(text) async def execute(self, query, variables=None, context=None, operation_name=None): return await graphql( self.schema, query, variable_values=variables, operation_name=operation_name, context_value=context, ) PK!-a>>2strawberry/contrib/starlette/app/graphql_ws_app.pyimport functools import typing # from graphql.error import GraphQLError, format_error as format_graphql_error from graphql.language import parse from graphql.subscription import subscribe from starlette.types import ASGIInstance, Receive, Scope, Send from starlette.websockets import WebSocket from .base import BaseApp class GraphQLSubscriptionApp(BaseApp): def __call__(self, scope: Scope) -> ASGIInstance: return functools.partial(self.asgi, scope=scope) async def execute(self, query, variables=None, context=None, operation_name=None): return await subscribe( self.schema, parse(query), variable_values=variables, operation_name=operation_name, context_value=context, ) async def _send_message( self, websocket: WebSocket, type_: str, payload: typing.Any = None, id_: str = None, ) -> None: data = {"type": type_} if id_ is not None: data["id"] = id_ if payload is not None: data["payload"] = payload return await websocket.send_json(data) async def asgi(self, receive: Receive, send: Send, scope: Scope) -> None: assert scope["type"] == "websocket" websocket = WebSocket(scope, receive=receive, send=send) await websocket.accept(subprotocol="graphql-ws") await self._send_message(websocket, "connection_ack") # TODO: we should check that this is a proper connection init message await websocket.receive_json() data = await websocket.receive_json() id_ = data.get("id", "1") payload = data.get("payload", {}) data = await self.execute( payload["query"], payload["variables"], operation_name=payload["operationName"], ) async for result in data: # TODO: send errors if any await self._send_message(websocket, "data", {"data": result.data}, id_) await self._send_message(websocket, "complete") await websocket.close() PK!11)strawberry/contrib/starlette/app/utils.pyimport pathlib def get_playground_template(request_path: str) -> str: here = pathlib.Path(__file__).parents[1] templates_path = here / "templates" with open(templates_path / "playground.html") as f: template = f.read() return template.replace("{{REQUEST_PATH}}", request_path) PK![pKK6strawberry/contrib/starlette/templates/playground.html GraphQL Playground
Loading GraphQL Playground
PK!.strawberry/contrib/starlette/utils/__init__.pyPK!i3strawberry/contrib/starlette/utils/graphql_lexer.pyfrom pygments import token from pygments.lexer import RegexLexer class GraphqlLexer(RegexLexer): name = "GraphQL" aliases = ["graphql", "gql"] filenames = ["*.graphql", "*.gql"] mimetypes = ["application/graphql"] tokens = { "root": [ (r"#.*", token.Comment.Singline), (r"\.\.\.", token.Operator), (r'"[\u0009\u000A\u000D\u0020-\uFFFF]*"', token.String.Double), ( r"(-?0|-?[1-9][0-9]*)(\.[0-9]+[eE][+-]?[0-9]+|\.[0-9]+|[eE][+-]?[0-9]+)", token.Number.Float, ), (r"(-?0|-?[1-9][0-9]*)", token.Number.Integer), (r"\$+[_A-Za-z][_0-9A-Za-z]*", token.Name.Variable), (r"[_A-Za-z][_0-9A-Za-z]+\s?:", token.Text), (r"(type|query|mutation|@[a-z]+|on|true|false|null)\b", token.Keyword.Type), (r"[!$():=@\[\]{|}]+?", token.Punctuation), (r"[_A-Za-z][_0-9A-Za-z]*", token.Keyword), (r"(\s|,)", token.Text), ] } PK!  strawberry/exceptions.py# TODO: add links to docs from typing import List, Set class MissingReturnAnnotationError(Exception): """The field is missing the return annotation""" def __init__(self, field_name: str): message = ( f'Return annotation missing for field "{field_name}", ' "did you forget to add it?" ) super().__init__(message) class MissingArgumentsAnnotationsError(Exception): """The field is missing the annotation for one or more arguments""" def __init__(self, field_name: str, arguments: Set[str]): arguments_list: List[str] = sorted(list(arguments)) if len(arguments_list) == 1: argument = f'argument "{arguments_list[0]}"' else: head = ", ".join(arguments_list[:-1]) argument = f'arguments "{head}" and "{arguments_list[-1]}"' message = ( f"Missing annotation for {argument} " f'in field "{field_name}", did you forget to add it?' ) super().__init__(message) PK!@,{ strawberry/field.pyfrom typing import get_type_hints from graphql import GraphQLField from .constants import IS_STRAWBERRY_FIELD, IS_STRAWBERRY_INPUT from .exceptions import MissingArgumentsAnnotationsError, MissingReturnAnnotationError from .type_converter import get_graphql_type_for_annotation from .utils.dict_to_type import dict_to_type from .utils.inspect import get_func_args from .utils.str_converters import to_camel_case, to_snake_case from .utils.typing import ( get_list_annotation, get_optional_annotation, is_list, is_optional, ) def convert_args(args, annotations): """Converts a nested dictionary to a dictionary of strawberry input types.""" converted_args = {} for key, value in args.items(): key = to_snake_case(key) annotation = annotations[key] # we don't need to check about unions here since they are not # yet supported for arguments. # see https://github.com/graphql/graphql-spec/issues/488 is_list_of_args = False if is_optional(annotation): annotation = get_optional_annotation(annotation) if is_list(annotation): annotation = get_list_annotation(annotation) is_list_of_args = True if getattr(annotation, IS_STRAWBERRY_INPUT, False): if is_list_of_args: converted_args[key] = [dict_to_type(x, annotation) for x in value] else: converted_args[key] = dict_to_type(value, annotation) else: converted_args[key] = value return converted_args def field(wrap, *, is_subscription=False): setattr(wrap, IS_STRAWBERRY_FIELD, True) annotations = get_type_hints(wrap) name = wrap.__name__ if "return" not in annotations: raise MissingReturnAnnotationError(name) field_type = get_graphql_type_for_annotation(annotations["return"], name) function_arguments = set(get_func_args(wrap)) - {"self", "info"} arguments_annotations = { key: value for key, value in annotations.items() if key not in ["info", "return"] } annotated_function_arguments = set(arguments_annotations.keys()) arguments_missing_annotations = function_arguments - annotated_function_arguments if len(arguments_missing_annotations) > 0: raise MissingArgumentsAnnotationsError(name, arguments_missing_annotations) arguments = { to_camel_case(name): get_graphql_type_for_annotation(annotation, name) for name, annotation in arguments_annotations.items() } def resolver(source, info, **args): args = convert_args(args, arguments_annotations) return wrap(source, info, **args) if is_subscription: def _resolve(event, info): return event kwargs = {"subscribe": resolver, "resolve": _resolve} else: kwargs = {"resolve": resolver} wrap.field = GraphQLField(field_type, args=arguments, **kwargs) return wrap PK!rhstrawberry/mutation.pyfrom functools import partial from .field import field # Mutations are field, we might want to separate things in the long run # for example to provide better errors mutation = field subscription = partial(field, is_subscription=True) PK!o//strawberry/scalars.pyimport typing ID = typing.NewType("ID", str) PK!%fYYstrawberry/schema.pyfrom graphql import GraphQLSchema # TODO: typings class Schema(GraphQLSchema): def __init__(self, query, mutation=None, subscription=None): super().__init__( query=query.field, mutation=mutation.field if mutation else None, subscription=subscription.field if subscription else None, ) PK!yYYstrawberry/type.pyimport typing from functools import partial from dataclasses import dataclass from graphql import ( GraphQLField, GraphQLInputField, GraphQLInputObjectType, GraphQLObjectType, ) from graphql.utilities.schema_printer import print_type from .constants import IS_STRAWBERRY_FIELD, IS_STRAWBERRY_INPUT from .type_converter import REGISTRY, get_graphql_type_for_annotation from .utils.str_converters import to_camel_case def _get_resolver(cls, field_name): def _resolver(obj, info): # TODO: can we make this nicer? # does it work in all the cases? field_resolver = getattr(cls(**(obj.__dict__ if obj else {})), field_name) if getattr(field_resolver, IS_STRAWBERRY_FIELD, False): return field_resolver(obj, info) return field_resolver return _resolver def type(cls, *, is_input=False): def wrap(): name = cls.__name__ REGISTRY[name] = cls def repr_(self): return print_type(self.field) setattr(cls, "__repr__", repr_) annotations = typing.get_type_hints(cls, None, REGISTRY) def _get_fields(): FieldClass = GraphQLInputField if is_input else GraphQLField fields = { to_camel_case(key): FieldClass( get_graphql_type_for_annotation(value, key), **({} if is_input else {"resolve": _get_resolver(cls, key)}) ) for key, value in annotations.items() } fields.update( { to_camel_case(key): value.field for key, value in cls.__dict__.items() if getattr(value, IS_STRAWBERRY_FIELD, False) } ) return fields if is_input: cls.field = GraphQLInputObjectType(name, lambda: _get_fields()) setattr(cls, IS_STRAWBERRY_INPUT, True) else: cls.field = GraphQLObjectType(name, lambda: _get_fields()) return dataclass(cls, repr=False) return wrap() input = partial(type, is_input=True) PK!_Bc c strawberry/type_converter.pyfrom collections.abc import AsyncGenerator from graphql import ( GraphQLBoolean, GraphQLFloat, GraphQLID, GraphQLInt, GraphQLList, GraphQLNonNull, GraphQLString, GraphQLUnionType, ) from .scalars import ID from .utils.typing import is_union REGISTRY = { str: GraphQLString, int: GraphQLInt, float: GraphQLFloat, bool: GraphQLBoolean, ID: GraphQLID, } # TODO: make so that we don't pass force optional # we use that when trying to get the type for a # option field (which can either be a scalar or an object type) def get_graphql_type_for_annotation( annotation, field_name: str, force_optional: bool = False ): # TODO: this might lead to issues with types that have a field value is_field_optional = force_optional if hasattr(annotation, "field"): graphql_type = annotation.field else: annotation_name = getattr(annotation, "_name", None) if annotation_name == "List": list_of_type = get_graphql_type_for_annotation( annotation.__args__[0], field_name ) return GraphQLList(list_of_type) annotation_origin = getattr(annotation, "__origin__", None) if annotation_origin == AsyncGenerator: # async generators are used in subscription, we only need the yield type # https://docs.python.org/3/library/typing.html#typing.AsyncGenerator return get_graphql_type_for_annotation(annotation.__args__[0], field_name) elif is_union(annotation): types = annotation.__args__ non_none_types = [x for x in types if x != None.__class__] # noqa:E721 # optionals are represented as Union[type, None] if len(non_none_types) == 1: is_field_optional = True graphql_type = get_graphql_type_for_annotation( non_none_types[0], field_name, force_optional=True ) else: is_field_optional = None.__class__ in types # TODO: union types don't work with scalar types # so we want to return a nice error # also we want to make sure we have been passed # strawberry types graphql_type = GraphQLUnionType( field_name, [type.field for type in types] ) else: graphql_type = REGISTRY.get(annotation) if not graphql_type: raise ValueError(f"Unable to get GraphQL type for {annotation}") if is_field_optional: return graphql_type return GraphQLNonNull(graphql_type) PK!strawberry/utils/__init__.pyPK! bb strawberry/utils/dict_to_type.pyfrom dataclasses import is_dataclass def dict_to_type(dict, cls): fields = cls.__dataclass_fields__ kwargs = {} for name, field in fields.items(): if is_dataclass(field.type): kwargs[name] = dict_to_type(dict.get(name, {}), field.type) else: kwargs[name] = dict.get(name) return cls(**kwargs) PK!"3wQWWstrawberry/utils/inspect.pyimport inspect from typing import Any, Callable def get_func_args(func: Callable[[Any], Any]): """Returns a list of arguments for the function""" sig = inspect.signature(func) return [ arg_name for arg_name, param in sig.parameters.items() if param.kind == inspect.Parameter.POSITIONAL_OR_KEYWORD ] PK!3"strawberry/utils/str_converters.pyimport re # Adapted from this response in Stackoverflow # http://stackoverflow.com/a/19053800/1072990 def to_camel_case(snake_str): components = snake_str.split("_") # We capitalize the first letter of each component except the first one # with the 'capitalize' method and join them together. return components[0] + "".join(x.capitalize() if x else "_" for x in components[1:]) # From this response in Stackoverflow # http://stackoverflow.com/a/1176023/1072990 def to_snake_case(name): s1 = re.sub(r"(.)([A-Z][a-z]+)", r"\1_\2", name) return re.sub(r"([a-z0-9])([A-Z])", r"\1_\2", s1).lower() def to_const(string): return re.sub(r"[\W|^]+", "_", string).upper() PK!|6strawberry/utils/typing.pyimport typing def is_list(annotation): """Returns True if annotation is a typing.List""" annotation_origin = getattr(annotation, "__origin__", None) return annotation_origin == list def is_union(annotation): """Returns True if annotation is a typing.Union""" annotation_origin = getattr(annotation, "__origin__", None) return annotation_origin == typing.Union def is_optional(annotation): """Returns True if the annotation is typing.Optional[SomeType]""" # Optionals are represented as unions if not is_union(annotation): return False types = annotation.__args__ # A Union to be optional needs to have at least one None type return any([x == None.__class__ for x in types]) # noqa:E721 def get_optional_annotation(annotation): types = annotation.__args__ non_none_types = [x for x in types if x != None.__class__] # noqa:E721 return non_none_types[0] def get_list_annotation(annotation): return annotation.__args__[0] PK!HA>-G*13strawberry_graphql-0.5.5.dist-info/entry_points.txtN+I/N.,()*.)J,OJ-*E0s2JPK!ͪN00*strawberry_graphql-0.5.5.dist-info/LICENSEMIT License Copyright (c) 2018 Patrick Arminio Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. PK!HnHTU(strawberry_graphql-0.5.5.dist-info/WHEEL A н#Z;/"d&F[xzw@Zpy3Fv]\fi4WZ^EgM_-]#0(q7PK!HUݒ:Lh+strawberry_graphql-0.5.5.dist-info/METADATA_O0w>E5 &1ty؍5Җ]˔ooFËO]` ma J&dHC/c ~*[4n`&$%ߠ=BR 0eIn]Â\{wJ_v~k-xRr䥍+LH ⍤(A0^%u٫҅i#h?omq6x0|?]Js5#lݻ+$Genecpɝz:kr[ͦc &48+N//8:=;-Hh0z6 +0 h:h} #N腃sPK!HhN/a )strawberry_graphql-0.5.5.dist-info/RECORDKH-~ zDD@ 7 Io7Y;,b:EhA=a^0 ?7"fr]΂m=3j#*"o'`f!NВ"bqʰ}??'NvWڷ>קq{AU OJ"llt?PKͮ_Y!l&d[0ܪ׌a=?j?\W8]mz-yAkǜM,vӕ^-[p$K5Y[IJ_SVZq-qZn[St;!Yؿam?D[*bY sB[e̋gthv8p`{5*U(^eCHeSD_CuxS1Kܞe i7f*Mg.< ?a}xP&= qSq+1Ds6usse׶kGpxS/fM'~3rjZA6Zn~t#m_,^t{MwCW0'Zl8=<0[XI]uMf_7W5a9{/W`o@gzyyѼJ/`ɓG87E# yIG U9*c&$Agug7nDi֑VXih %*dZe!So>UY|tAocThL]Wtg?7eWWxS5v99RUhPcL̪UD_Hj-ƙHeKoPK!!strawberry/__init__.pyPK!]~~ strawberry/cli/__init__.pyPK!)QZZstrawberry/constants.pyPK!Pstrawberry/contrib/__init__.pyPK!8oo(strawberry/contrib/starlette/__init__.pyPK!,Astrawberry/contrib/starlette/app/__init__.pyPK! GG(strawberry/contrib/starlette/app/base.pyPK!Q/ strawberry/contrib/starlette/app/graphql_app.pyPK!-a>>2strawberry/contrib/starlette/app/graphql_ws_app.pyPK!11)u"strawberry/contrib/starlette/app/utils.pyPK![pKK6#strawberry/contrib/starlette/templates/playground.htmlPK!.Wostrawberry/contrib/starlette/utils/__init__.pyPK!i3ostrawberry/contrib/starlette/utils/graphql_lexer.pyPK!  sstrawberry/exceptions.pyPK!@,{ 0xstrawberry/field.pyPK!rh strawberry/mutation.pyPK!o//+strawberry/scalars.pyPK!%fYYstrawberry/schema.pyPK!yYYstrawberry/type.pyPK!_Bc c strawberry/type_converter.pyPK!>strawberry/utils/__init__.pyPK! bb xstrawberry/utils/dict_to_type.pyPK!"3wQWWstrawberry/utils/inspect.pyPK!3"strawberry/utils/str_converters.pyPK!|6strawberry/utils/typing.pyPK!HA>-G*13ˤstrawberry_graphql-0.5.5.dist-info/entry_points.txtPK!ͪN00*Fstrawberry_graphql-0.5.5.dist-info/LICENSEPK!HnHTU(strawberry_graphql-0.5.5.dist-info/WHEELPK!HUݒ:Lh+Xstrawberry_graphql-0.5.5.dist-info/METADATAPK!HhN/a )strawberry_graphql-0.5.5.dist-info/RECORDPKj G