PK!   typegql/__init__.pyfrom .core.graph import Graph, InputGraph, GraphInfo, GraphArgument, Connection from .core.schema import Schema from .core.types import ID, DateTime __all__ = ( 'Graph', 'InputGraph', 'GraphInfo', 'GraphArgument', 'Connection', 'Schema', 'ID', 'DateTime' ) PK!3typegql/client/__init__.pyfrom .client import Client PK!Y typegql/client/client.pyfrom typing import overload, Dict, Tuple import aiohttp from graphql import get_introspection_query, build_client_schema, DocumentNode, ExecutionResult from typegql.client.dsl import DSLSchema class Client: """ Usage: async with Client(url) as client: await client.introspection() dsl = client.dsl query = dsl.Query.clients_connection.select(dsl.ClientConnection.total_count) doc = dsl.query(query) result = await client.execute(doc) """ def __init__(self, url: str, auth=None, headers: Dict = None, use_json=True, timeout=None, camelcase=True): self.url = url self.session: aiohttp.ClientSession = None self.dsl: DSLSchema = None self.auth = auth self.headers = headers self.use_json = use_json self.timeout = timeout self.camelcase = camelcase async def init(self): self.session = self.session or aiohttp.ClientSession() async def __aenter__(self): await self.init() return self async def __aexit__(self, exc_type, exc_val, exc_tb): await self.close() async def close(self): await self.session.close() async def introspection(self): status, result = await self.execute(get_introspection_query()) assert status == 200 schema = build_client_schema(result.data) self.dsl = DSLSchema(schema, camelcase=self.camelcase) return schema @overload async def execute(self, document: DocumentNode, variable_values=None, timeout=None) -> ExecutionResult: pass @overload async def execute(self, query: str, variable_values=None, timeout=None) -> ExecutionResult: pass async def execute(self, query: str, variable_values=None, timeout=None) -> Tuple[int, ExecutionResult]: if isinstance(query, DocumentNode): query = self.dsl.as_string(query) payload = { 'query': query, 'variables': variable_values or {} } if self.use_json: body = {'json': payload} else: body = {'data': payload} async with self.session.post(self.url, auth=self.auth, headers=self.headers, timeout=timeout or self.timeout, **body) as response: result = await response.json() if self.use_json else response.text() assert 'errors' in result or 'data' in result, f'Received non-compatible response "{result}"' return response.status, ExecutionResult( errors=result.get('errors'), data=result.get('data') ) PK!""QQtypegql/client/dsl.pyimport collections import decimal from functools import partial from graphql import GraphQLField, print_ast, ast_from_value, GraphQLNonNull, GraphQLInputField, GraphQLList, \ GraphQLEnumType, GraphQLInputObjectType, OperationType, GraphQLString from graphql.language import ast from graphql.pyutils import snake_to_camel class DSLField: def __init__(self, name, f, camelcase=True): self.field = f self.ast_field = ast.FieldNode(name=ast.NameNode(value=name), arguments=[]) self.selection_set = None self.camelcase = camelcase def select(self, *fields): if not self.ast_field.selection_set: self.ast_field.selection_set = ast.SelectionSetNode(selections=[]) self.ast_field.selection_set.selections.extend(selections(*fields)) return self def __call__(self, *args, **kwargs): return self.args(*args, **kwargs) def alias(self, alias): self.ast_field.alias = ast.NameNode(value=alias) return self def args(self, **kwargs): if self.camelcase: self.args_to_camelcase(kwargs) for name, value in kwargs.items(): arg = self.field.args.get(name) assert arg, f'Invalid argument {name} for field {self.name}' arg_type_serializer = get_arg_serializer(arg.type) value = arg_type_serializer(value) self.ast_field.arguments.append( ast.ArgumentNode( name=ast.NameNode(value=name), value=get_ast_value(value) ) ) return self def args_to_camelcase(self, arguments): if not isinstance(arguments, dict): return keys = [k for k in arguments.keys()] for key in keys: if isinstance(arguments[key], list): for arg in arguments[key]: self.args_to_camelcase(arg) arguments[snake_to_camel(key, upper=False)] = arguments.pop(key) @property def ast(self): return self.ast_field @property def name(self): return self.ast.name.value class DSLType(object): def __init__(self, _type, camelcase=True): self.type = _type self.camelcase = camelcase def __getattr__(self, name): formatted_name, field_def = self.get_field(name) return DSLField(formatted_name, field_def, camelcase=self.camelcase) def get_field(self, name): if self.camelcase: name = snake_to_camel(name, upper=False) if name in self.type.fields: return name, self.type.fields[name] raise KeyError('Field {} doesnt exist in type {}.'.format(name, self.type.name)) class DSLSchema(object): def __init__(self, schema, camelcase=True): self.schema = schema self.camelcase = camelcase def __getattr__(self, name): type_def = self.schema.get_type(name) return DSLType(type_def, self.camelcase) def query(self, *fields, operation=OperationType.QUERY) -> ast.DocumentNode: return ast.DocumentNode( definitions=[ast.OperationDefinitionNode( operation=operation, selection_set=ast.SelectionSetNode( selections=list(selections(*fields)) ) )] ) def mutation(self, *fields) -> ast.DocumentNode: return self.query(*fields, operation=OperationType.MUTATION) def as_string(self, doc): return print_ast(doc) def field(f, **args): if isinstance(f, GraphQLField): return DSLField(f).args(**args) elif isinstance(f, DSLField): return f raise Exception('Received incompatible query field: "{}".'.format(field)) def selections(*fields): for _field in fields: yield field(_field).ast def get_ast_value(value): if isinstance(value, ast.Node): return value if isinstance(value, ast.ValueNode): return value if isinstance(value, str): return ast.StringValueNode(value=value) elif isinstance(value, bool): return ast.BooleanValueNode(value=value) elif isinstance(value, (float, decimal.Decimal)): return ast.FloatValueNode(value=value) elif isinstance(value, int): return ast.IntValueNode(value=value) elif isinstance(value, list): return ast.ListValueNode(values=[get_ast_value(v) for v in value]) return None def serialize_list(serializer, values): assert isinstance(values, collections.Iterable), 'Expected iterable, received "{}"'.format(repr(values)) result = list() for val in values: result.append(serializer(val)) return result def serialize_string(value): return ast.StringValueNode(value=value) def serialize_enum(arg_type, value): return ast.EnumValueNode(value=arg_type.serialize(value)) def serialize_input_object(arg_type, value): serializers = {k: get_arg_serializer(v) for k, v in arg_type.fields.items()} result = ast_from_value(value, arg_type) for f in result.fields: serialized = serializers[f.name.value](value[f.name.value]) if isinstance(f.value, ast.ListValueNode): f.value = ast.ListValueNode(values=serialized) else: f.value = serialized return result def get_arg_serializer(arg_type): if isinstance(arg_type, GraphQLNonNull): return get_arg_serializer(arg_type.of_type) if arg_type == GraphQLString: return serialize_string if isinstance(arg_type, GraphQLInputField): return get_arg_serializer(arg_type.type) if isinstance(arg_type, GraphQLList): inner_serializer = get_arg_serializer(arg_type.of_type) return partial(serialize_list, inner_serializer) if isinstance(arg_type, GraphQLEnumType): return partial(serialize_enum, arg_type) if isinstance(arg_type, GraphQLInputObjectType): return partial(serialize_input_object, arg_type) return partial(serialize_value, arg_type) def serialize_value(arg_type, value): return ast_from_value( str(value) if arg_type.serialize(value) is None else arg_type.serialize(value), arg_type ) PK!typegql/core/__init__.pyPK!:4h::typegql/core/execution.pyfrom inspect import isawaitable from typing import List, Any, Union from graphql import ExecutionContext, GraphQLField, FieldNode, GraphQLFieldResolver, GraphQLResolveInfo, GraphQLError, \ GraphQLSchema, is_introspection_type from graphql.execution.values import get_argument_values from graphql.pyutils import camel_to_snake class TGQLExecutionContext(ExecutionContext): async def await_result(self, result): return await result def resolve_field_value_or_error( self, field_def: GraphQLField, field_nodes: List[FieldNode], resolve_fn: GraphQLFieldResolver, source: Any, info: GraphQLResolveInfo ) -> Union[Exception, Any]: try: camelcase = getattr(info.schema, 'camelcase', False) arguments = get_argument_values(field_def, field_nodes[0], self.variable_values) if camelcase and not is_introspection_type(info.parent_type): self.to_snake(info, arguments) result = resolve_fn(source, info, **arguments) if isawaitable(result): return self.await_result(result) return result except GraphQLError as e: return e def to_snake(self, info, arguments): if not isinstance(arguments, dict): return keys = [k for k in arguments.keys()] for key in keys: if isinstance(arguments[key], list): for arg in arguments[key]: self.to_snake(info, arg) arguments[camel_to_snake(key)] = arguments.pop(key) PK!~))typegql/core/graph.pyfrom __future__ import annotations import dataclasses from enum import Enum from typing import get_type_hints, Type, List, Any, TypeVar, Generic import graphql from graphql.pyutils import snake_to_camel from .types import DateTime, ID, Dictionary @dataclasses.dataclass class GraphInfo: name: str = dataclasses.field(default='') required: bool = dataclasses.field(default=False) use_in_mutation: bool = dataclasses.field(default=True) description: str = dataclasses.field(default='') arguments: List[GraphArgument] = dataclasses.field(default_factory=list) class Graph: _types = { 'ID': graphql.GraphQLID, 'bool': graphql.GraphQLBoolean, 'int': graphql.GraphQLInt, 'float': graphql.GraphQLFloat, 'str': graphql.GraphQLString, 'datetime': DateTime(), 'Dict': Dictionary() } def __init__(self, **kwargs): for name, _ in get_type_hints(self.__class__).items(): if name not in kwargs: continue setattr(self, name, kwargs.get(name)) @classmethod def get_fields(cls, graph: Type[Graph], is_mutation=False, camelcase=True): result = dict() meta = getattr(graph, 'Meta', None) exclude = getattr(meta, 'graph_exclude', tuple()) for name, _type in get_type_hints(graph).items(): if name in exclude: continue info = getattr(meta, name, GraphInfo()) assert isinstance(info, GraphInfo), f'{graph.__name__} info for `{name}` MUST be of type `GraphInfo`' if is_mutation and not info.use_in_mutation: continue graph_type = cls.map_type(_type, is_mutation=is_mutation) if not graph_type: continue if cls.is_connection(_type): info.arguments.extend(cls.page_arguments()) if info.required: graph_type = graphql.GraphQLNonNull(graph_type) args = cls.arguments(info, camelcase) field_name = info.name or name if camelcase: field_name = snake_to_camel(field_name, upper=False) if is_mutation: result[field_name] = graph_type else: result[field_name] = graphql.GraphQLField(graph_type, description=info.description, args=args) return result @classmethod def map_type(cls, _type: Any, is_mutation=False): if isinstance(_type, graphql.GraphQLType): return _type try: type_name = _type.__name__ except AttributeError: type_name = _type._name if not type_name: type_name = _type.__origin__.__name__ if Graph.is_connection(_type): return Connection.get_fields(_type) if Graph.is_enum(_type): if type_name in cls._types: return cls._types.get(type_name) enum_type = graphql.GraphQLEnumType(type_name, _type) cls._types[type_name] = enum_type return enum_type if Graph.is_list(_type): inner = cls.map_type(_type.__args__[0], is_mutation=is_mutation) return graphql.GraphQLList(inner) if Graph.is_graph(_type): return cls.build_object_type(type_name, _type, is_mutation=is_mutation) return cls._types.get(type_name) @staticmethod def is_list(_type: Any) -> bool: try: return issubclass(_type.__origin__, List) except AttributeError: return False @staticmethod def is_enum(_type: Any) -> bool: try: return issubclass(_type, Enum) except TypeError: return False @staticmethod def is_graph(_type: Any) -> bool: try: return issubclass(_type, Graph) except TypeError: return False @staticmethod def is_connection(_type: Any) -> bool: try: return _type.__origin__ is Connection or issubclass(_type.__origin__, Connection) except (TypeError, AttributeError): return False @classmethod def build_object_type(cls, type_name, _type, info: GraphInfo=None, is_mutation=False): if is_mutation: type_name = f'{type_name}Mutation' if type_name in cls._types: return cls._types[type_name] fields = cls.get_fields(_type, is_mutation=is_mutation) if not is_mutation: graph_type = graphql.GraphQLObjectType(type_name, fields=fields) else: graph_type = graphql.GraphQLInputObjectType(type_name, fields=fields) if isinstance(info, GraphInfo): if info.required: graph_type = graphql.GraphQLNonNull(graph_type) cls._types[type_name] = graph_type return graph_type @classmethod def arguments(cls, info: GraphInfo, camelcase=True): result: graphql.GraphQLArgumentMap = dict() for arg in getattr(info, 'arguments', []): if not isinstance(arg, GraphArgument): continue _type = cls.map_type(arg.type, is_mutation=arg.is_input) if arg.required: _type = graphql.GraphQLNonNull(_type) arg_name = snake_to_camel(arg.name, False) if camelcase else arg.name result[arg_name] = graphql.GraphQLArgument(_type, description=arg.description) return result @classmethod def page_arguments(cls): return [ GraphArgument[int]('first', description='Retrieve only the first `n` nodes of this connection'), GraphArgument[int]('last', description='Retrieve only the last `n` nodes of this connection'), GraphArgument[str]('before', description='Retrieve nodes for this connection before this cursor'), GraphArgument[str]('after', description='Retrieve nodes for this connection after this cursor') ] T = TypeVar('T') class Node(Graph, Generic[T]): id: ID class Meta: id = GraphInfo(required=True) class Edge(Graph, Generic[T]): node: Node[T] cursor: str class Meta: node = GraphInfo(required=True, description='Scalar representing your data') cursor = GraphInfo(required=True, description='Pagination cursor') class PageInfo(Graph): has_next: bool has_previous: bool start_cursor: str end_cursor: str class Meta: has_next = GraphInfo(required=True, description='When paginating forwards, are there more items?') has_previous = GraphInfo(required=True, description='When paginating backwards, are there more items?') class Connection(Graph, Generic[T]): edges: List[Edge[T]] page_info: PageInfo class Meta: edges = GraphInfo(required=True, description='Connection edges') page_info = GraphInfo(required=True, description='Pagination information') @classmethod def build(cls): if 'Node' not in cls._types: cls._types['Node'] = graphql.GraphQLInterfaceType('Node', super().get_fields(Node)) if 'Edge' not in cls._types: cls._types['Edge'] = graphql.GraphQLInterfaceType('Edge', super().get_fields(Edge)) if 'PageInfo' not in cls._types: cls._types['PageInfo'] = graphql.GraphQLObjectType('PageInfo', super().get_fields(PageInfo)) if 'Connection' not in cls._types: cls._types['Connection'] = graphql.GraphQLInterfaceType('Connection', super().get_fields(Connection)) @classmethod def get_fields(cls, graph: Type[Graph], is_mutation=False, camelcase=True): if not Graph.is_connection(graph): return super().get_fields(graph, is_mutation=is_mutation, camelcase=camelcase) cls.build() connection_class = graph.__origin__ wrapped = graph.__args__[0] fields = {} meta = getattr(graph, 'Meta', None) for name, _type in get_type_hints(connection_class).items(): info = getattr(meta, name, GraphInfo()) if Graph.is_list(_type) and _type.__args__[0] is Edge[T]: inner = _type.__args__[0] graph_type = graphql.GraphQLList(cls.get_edge_field(inner.__origin__, wrapped, camelcase=camelcase)) else: graph_type = cls.map_type(_type) if info.required: graph_type = graphql.GraphQLNonNull(graph_type) field_name = info.name or name if camelcase: field_name = snake_to_camel(field_name, upper=False) fields[field_name] = graphql.GraphQLField(graph_type, description=info.description) type_name = f'{wrapped.__name__}Connection' return graphql.GraphQLObjectType(type_name, fields=fields, interfaces=(cls._types.get('Connection'),)) @classmethod def get_edge_field(cls, edge_type, inner: Type[T], camelcase=True): fields = dict() meta = getattr(edge_type, 'Meta', None) for name, _type in get_type_hints(edge_type).items(): info = getattr(meta, name, GraphInfo()) if _type is Node[T]: graph_type = cls.get_node_fields(inner) else: graph_type = cls.map_type(_type) if info.required: graph_type = graphql.GraphQLNonNull(graph_type) field_name = info.name or name if camelcase: field_name = snake_to_camel(field_name, upper=False) fields[field_name] = graph_type return graphql.GraphQLNonNull(graphql.GraphQLObjectType( f'{inner.__name__}Edge', fields=fields, interfaces=(cls._types.get('Edge'),) )) @classmethod def get_node_fields(cls, _type: Type[T]): return graphql.GraphQLObjectType( f'{_type.__name__}Node', fields=super().get_fields(_type), interfaces=(cls._types.get('Node'),) ) @dataclasses.dataclass class GraphArgument(Generic[T]): name: str description: str = '' required: bool = False is_input: bool = False @property def type(self): return self.__orig_class__.__args__[0] class InputGraph(Graph): @classmethod def get_fields(cls, graph: Type[InputGraph], is_mutation=False, camelcase=True): return super().get_fields(graph, is_mutation, camelcase) PK!Dv typegql/core/schema.pyimport logging from typing import Type, Callable, Any from graphql import GraphQLSchema, GraphQLObjectType, graphql, OperationType, validate_schema from graphql.pyutils import camel_to_snake from typegql.core.execution import TGQLExecutionContext from typegql.core.graph import Graph logger = logging.getLogger(__name__) class Schema(GraphQLSchema): def __init__(self, query: Type[Graph] = None, mutation: Type[Graph] = None, subscription: Type[Graph] = None, camelcase=True): super().__init__() self.camelcase = camelcase if query: self.query: Callable = query query_fields = query.get_fields(query, camelcase=self.camelcase) query = GraphQLObjectType( 'Query', fields=query_fields, ) if mutation: self.mutation: Callable = mutation mutation_fields = mutation.get_fields(mutation, camelcase=self.camelcase) mutation = GraphQLObjectType( 'Mutation', fields=mutation_fields ) if subscription: self.subscription: Callable = subscription subscription_fields = subscription.get_fields(subscription, camelcase=self.camelcase) subscription = GraphQLObjectType( 'Subscription', fields=subscription_fields ) super().__init__(query, mutation, subscription) errors = validate_schema(self) if errors: raise errors[0] def _field_resolver(self, source, info, **kwargs): field_name = info.field_name if self.camelcase: field_name = camel_to_snake(field_name) if info.operation.operation == OperationType.MUTATION and Graph.is_graph(source.__class__): try: mutation = getattr(source, f'mutate_{field_name}') return mutation(info, **kwargs) except AttributeError: return if Graph.is_graph(source.__class__): _type = source.__annotations__.get(field_name) if Graph.is_connection(_type): method = getattr(_type, 'resolve', None) if method: return method(source, field_name, _type.__args__[0], info, **kwargs) value = ( source.get(field_name) if isinstance(source, dict) else getattr(source, f'resolve_{field_name}', getattr(source, field_name, None)) ) if callable(value): return value(info, **kwargs) return value async def run(self, query: str, root: Graph = None, operation: str = None, context: Any = None, variables=None, middleware=None): if query.startswith('mutation') and not root: root = self.mutation() elif not root: root = self.query() result = await graphql(self, query, root_value=root, field_resolver=self._field_resolver, operation_name=operation, context_value=context, variable_values=variables, middleware=middleware, execution_context_class=TGQLExecutionContext) return result PK!XBtypegql/core/types.pyimport base64 from datetime import datetime from typing import Dict import graphql from graphql.language import ast class DateTime(graphql.GraphQLScalarType): def __init__(self, name='DateTime'): super().__init__( name=name, description='The `DateTime` scalar type represents a DateTime value as specified by ' '[iso8601](https://en.wikipedia.org/wiki/ISO_8601).', serialize=DateTime.serialize, parse_value=DateTime.parse_value, parse_literal=DateTime.parse_literal, ) @staticmethod def serialize(value: datetime): assert isinstance(value, datetime), 'datetime value expected' return value.isoformat() @staticmethod def parse_literal(node): if isinstance(node, ast.StringValueNode): try: return datetime.fromisoformat(node.value) except ValueError: pass @staticmethod def parse_value(value: str): try: return datetime.fromisoformat(value) except ValueError: pass class ID(graphql.GraphQLScalarType): @classmethod def encode(cls, value): if not isinstance(value, str): value = str(value) return base64.b64encode(value.encode()).decode() @classmethod def decode(cls, value): return base64.b64decode(value).decode() class Dictionary(graphql.GraphQLScalarType): def __init__(self, name='Dictionary'): super().__init__( name=name, description='Dictionary type / HashMap', serialize=Dictionary.serialize, parse_value=Dictionary.parse_value, parse_literal=Dictionary.parse_literal, ) @staticmethod def serialize(value: Dict): assert isinstance(value, dict), 'dict value expected' return value @staticmethod def parse_literal(node): if isinstance(node, ast.StringValueNode): try: return eval(node.value) except ValueError: pass @staticmethod def parse_value(value: str): try: return eval(value) except ValueError: pass PK!HlUTtypegql-1.0.7.dist-info/WHEEL HM K-*ϳR03rOK-J,/R(O-)T0343 /, (-JLR()*M IL*4KM̫PK!HS+GO typegql-1.0.7.dist-info/METADATAXO=u'R%)p']].pOzBvo_Ҥ-,"D|<3iQM1(r҂%D/+6σ?uQPL QVc IA"bĎ*2#(ZJVKRG H U2G5Oq(XT9ضкRp8ztm Ϯ?QL%daeb\sV2S8X@ 9v+(|,Epk.,H y7!)<1 sq$I3rTUR< fO|( sLy V~Wz|%rJ+bOΏƝ"d4"YQ*$J`GO(Xr!{~|¯a#Ru1NZe8a*ѱ(5] >ER c"E!Tdl7 $Cӂ?|#A>7-w{1MZpլCy^{ ~g d0@J)4c,'3ӒOkpѳRi[€-Ms%+܊WmOsd/-Ak'΄$rV0&m9tfh@?Nf)DQfs5ji(i5pK7AW$mJZȹVaB>?[X> 1E|H\G2]L{Nzc7(7r/^a W­E1kVֆ69?~ "z ?7Ffj`5b]{h9՛B94v n7?nl 7J`|g( yApEG>p#Զ!+̒H PK!H>H0o1typegql-1.0.7.dist-info/RECORDuɒ@}} X$Ӣ(( !II@;*znN$]_ >팡,48*+y18B&My:Xї KEP]"w12c?K~Z YQ2ۂvR8Lwza}Z2%MUە#gE*%m:R:-4yкػ@!0g16{p.թoLXKzDIEՈwxH H0o1&utypegql-1.0.7.dist-info/RECORDPK Nw