PK!DǙ gql/cli.py#!/usr/bin/env python import click import glob import time import os from os.path import join as join_paths, isfile from graphql import GraphQLSchema from watchdog.observers import Observer from watchdog.events import FileSystemEventHandler, EVENT_TYPE_CREATED, EVENT_TYPE_MODIFIED from gql.config import Config from gql.query_parser import QueryParser, AnonymousQueryError, InvalidQueryError from gql.renderer_dataclasses import DataclassesRenderer from gql.utils_schema import load_schema DEFAULT_CONFIG_FNAME = '.gql.json' SCHEMA_PROMPT = click.style('Where is your schema?: ', fg='bright_white') + \ click.style('(path or url) ', fg='bright_black', dim=False) ROOT_PROMPT = click.style('Whats the root of your project: ', fg='bright_white') + \ click.style('(path or url) ', fg='bright_black', dim=False) def safe_remove(fname): try: os.remove(fname) except: pass @click.group() def cli(): pass @cli.command() @click.option('--schema', prompt=SCHEMA_PROMPT, default='http://localhost:4000') @click.option('--endpoint', prompt=SCHEMA_PROMPT, default='same as schema') @click.option('--root', prompt=ROOT_PROMPT, default='./src') @click.option('-c', '--config', 'config_filename', default=DEFAULT_CONFIG_FNAME, type=click.Path(exists=False)) def init(schema, endpoint, root, config_filename): if isfile(config_filename): click.confirm(f'{config_filename} already exists. Are you sure you want to continue?', abort=True) if endpoint == 'same as schema': endpoint = schema config = Config( schema=schema, endpoint=endpoint, documents=join_paths(root, '**/*.graphql') ) config.save(config_filename) click.echo(f"Config file generated at {click.style(config_filename, fg='bright_white')}\n\n") def process_file(filename: str, parser: QueryParser, renderer: DataclassesRenderer): root, _s = os.path.splitext(filename) target_filename = root + '.py' click.echo(f'Parsing {filename} ... ', nl=False) with open(filename, 'r') as fin: query = fin.read() try: parsed = parser.parse(query) rendered = renderer.render(parsed) with open(target_filename, 'w') as outfile: outfile.write(rendered) click.secho('Success!', fg='bright_white') except AnonymousQueryError: click.secho('Failed!', fg='bright_red') click.secho('\tQuery is missing a name', fg='bright_black') safe_remove(target_filename) except InvalidQueryError as invalid_err: click.secho('Failed!', fg='bright_red') click.secho(f'\t{invalid_err}', fg='bright_black') safe_remove(target_filename) @cli.command() @click.option('-c', '--config', 'config_filename', default=DEFAULT_CONFIG_FNAME, type=click.Path(exists=True)) def run(config_filename): if not isfile(config_filename): click.echo(f'Could not find configuration file {config_filename}') config = Config.load(config_filename) schema = load_schema(config.schema) filenames = glob.glob(config.documents, recursive=True) query_parser = QueryParser(schema) query_renderer = DataclassesRenderer(schema, config) for filename in filenames: process_file(filename, query_parser, query_renderer) @cli.command() @click.option('-c', '--config', 'config_filename', default=DEFAULT_CONFIG_FNAME, type=click.Path(exists=True)) def watch(config_filename): class Handler(FileSystemEventHandler): def __init__(self, config: Config, schema: GraphQLSchema): self.parser = QueryParser(schema) self.renderer = DataclassesRenderer(schema, config) def on_any_event(self, event): if event.is_directory: return if event.event_type in {EVENT_TYPE_CREATED, EVENT_TYPE_MODIFIED}: filenames = [os.path.abspath(fn) for fn in glob.iglob(config.documents, recursive=True)] if event.src_path not in filenames: return # Take any action here when a file is first created. process_file(event.src_path, self.parser, self.renderer) if not isfile(config_filename): click.echo(f'Could not find configuration file {config_filename}') config = Config.load(config_filename) schema = load_schema(config.schema) click.secho(f'Watching {config.documents}', fg='cyan') click.secho('Ready for changes...', fg='cyan') observer = Observer() observer.schedule(Handler(config, schema), os.path.abspath('./'), recursive=True) observer.start() try: while True: time.sleep(5) except: observer.stop() print('Error') observer.join() if __name__ == '__main__': cli() PK!I/<<gql/clients/__init__.pyfrom .asyncio import AsyncIOClient from .sync import Client PK!t##gql/clients/asyncio.pyfrom typing import Callable, Mapping, Union import aiohttp class AsyncIOClient: def __init__(self, endpoint, headers=None): self.endpoint = endpoint headers = headers or {} self.__headers = { **headers, 'Content-Type': 'application/json', 'Accept': 'application/json', 'Accept-Encoding': 'gzip', } self.__session = None @property def session(self): if not self.__session or self.__session.closed: self.__session = aiohttp.ClientSession(headers=self.__headers) return self.__session async def call(self, query, variables=None, return_json=False, on_before_callback: Callable[[Mapping[str, str], Mapping[str, str]], None] = None) -> Union[dict, str]: headers = self.__headers.copy() payload = { 'query': query } if variables: payload['variables'] = variables if on_before_callback: on_before_callback(payload, headers) async with self.session.post(self.endpoint, json=payload, headers=headers, raise_for_status=True) as resp: if return_json: return await resp.json() return await resp.text() PK!A^gql/clients/sync.pyfrom typing import Callable, Mapping, Union import requests class Client: def __init__(self, endpoint, headers=None): self.endpoint = endpoint headers = headers or {} self.__headers = { **headers, 'Content-Type': 'application/json', 'Accept': 'application/json', 'Accept-Encoding': 'gzip', } def call(self, query, variables=None, return_json=False, on_before_callback: Callable[[Mapping[str, str], Mapping[str, str]], None] = None) -> Union[dict, str]: headers = self.__headers.copy() payload = { 'query': query } if variables: payload['variables'] = variables if on_before_callback: on_before_callback(payload, headers) response = requests.post(self.endpoint, json=payload, headers=headers) response.raise_for_status() return response.json() if return_json else response.text PK!X gql/config.pyfrom typing import Type, TypeVar from dataclasses import dataclass from dataclasses_json import dataclass_json ConfigT = TypeVar('ConfigT', bound='ConfigT') @dataclass_json @dataclass(frozen=True) class Config: schema: str endpoint: str documents: str custom_header: str = '' @classmethod def load(cls: Type[ConfigT], filename: str) -> ConfigT: with open(filename, 'r') as fin: json_str = fin.read() return cls.from_json(json_str) # pylint:disable=no-member def save(self, filename, pretty=True): with open(filename, 'w') as outfile: json_str = self.to_json(indent=2) if pretty else self.to_json() # pylint:disable=no-member outfile.write(json_str) PK!:mgql/query_parser.pyfrom typing import Any, List, Mapping, Union, cast from dataclasses import dataclass, field from graphql import GraphQLSchema, validate, parse, get_operation_ast, visit, Visitor, TypeInfo, TypeInfoVisitor, \ GraphQLNonNull, is_scalar_type, GraphQLList, OperationDefinitionNode, NonNullTypeNode, TypeNode, GraphQLEnumType, \ is_enum_type @dataclass class ParsedField: name: str type: str nullable: bool default_value: Any = None @dataclass class ParsedObject: name: str fields: List[ParsedField] = field(default_factory=list) parents: List[str] = field(default_factory=list) children: List['ParsedObject'] = field(default_factory=list) @dataclass class ParsedEnum: name: str values: Mapping[str, Any] @dataclass class ParsedVariableDefinition: name: str type: str nullable: bool default_value: Any = None @dataclass class ParsedOperation: name: str type: str variables: List[ParsedVariableDefinition] = field(default_factory=list) children: List[ParsedObject] = field(default_factory=list) NodeT = Union[ParsedOperation, ParsedObject] @dataclass class ParsedQuery: query: str objects: List[NodeT] = field(default_factory=list) enums: List[ParsedEnum] = field(default_factory=list) class FieldToTypeMatcherVisitor(Visitor): def __init__(self, schema: GraphQLSchema, type_info: TypeInfo, query: str): self.schema = schema self.type_info = type_info self.query = query self.parsed = ParsedQuery(query=self.query) self.dfs_path: List[ParsedObject] = [] def push(self, obj: NodeT): self.dfs_path.append(obj) def pull(self) -> NodeT: return self.dfs_path.pop() @property def current(self) -> ParsedObject: return self.dfs_path[-1] # Document def enter_operation_definition(self, node: OperationDefinitionNode, *_args): name, operation = node.name, node.operation variables = [] for var in node.variable_definitions: ptype, nullable, _ = self.__variable_type_to_python(var.type) variables.append(ParsedVariableDefinition( name=var.variable.name.value, type=ptype, nullable=nullable, default_value=var.default_value.value if var.default_value else None, )) parsed_op = ParsedOperation( name=name.value, type=str(operation.value), variables=variables, children=[ ParsedObject(name=f'{name.value}Data') ] ) self.parsed.objects.append(parsed_op) # pylint:disable=no-member self.push(parsed_op) self.push(parsed_op.children[0]) # pylint:disable=unsubscriptable-object return node # def enter_selection_set(self, node, *_): # return node def leave_selection_set(self, node, *_): self.pull() return node # Fragments def enter_fragment_definition(self, node, *_): # Same as operation definition obj = ParsedObject( name=node.name.value ) self.parsed.objects.append(obj) # pylint:disable=no-member self.push(obj) return node def enter_fragment_spread(self, node, *_): self.current.parents.append(node.name.value) return node # def enter_inline_fragment(self, node, *_): # return node # # def leave_inline_fragment(self, node, *_): # return node # Field def enter_field(self, node, *_): name = node.alias.value if node.alias else node.name.value graphql_type = self.type_info.get_type() python_type, nullable, underlying_graphql_type = self.__scalar_type_to_python(graphql_type) parsed_field = ParsedField( name=name, type=python_type, nullable=nullable, ) self.current.fields.append(parsed_field) # TODO: nullables should go to the end if not is_scalar_type(underlying_graphql_type): if is_enum_type(underlying_graphql_type): enum_type = cast(GraphQLEnumType, self.schema.type_map[underlying_graphql_type.name]) name = enum_type.name if not any(e.name == name for e in self.parsed.enums): # pylint:disable=not-an-iterable parsed_enum = ParsedEnum( name=enum_type.name, values={name: value.value or name for name, value in enum_type.values.items()} ) self.parsed.enums.append(parsed_enum) # pylint:disable=no-member else: obj = ParsedObject( name=str(underlying_graphql_type) ) self.current.children.append(obj) self.push(obj) return node @staticmethod def __scalar_type_to_python(scalar): nullable = True if isinstance(scalar, GraphQLNonNull): nullable = False scalar = scalar.of_type mapping = { 'ID': 'str', 'String': 'str', 'Int': 'int', 'Float': 'float', 'Boolean': 'bool', 'DateTime': 'DateTime' } if isinstance(scalar, GraphQLList): scalar = scalar.of_type mapping = f'List[{mapping.get(str(scalar), str(scalar))}]' else: mapping = mapping.get(str(scalar), str(scalar)) return mapping, nullable, scalar @staticmethod def __variable_type_to_python(var_type: TypeNode): nullable = True if isinstance(var_type, NonNullTypeNode): nullable = False var_type = var_type.type mapping = { 'ID': 'str', 'String': 'str', 'Int': 'int', 'Float': 'float', 'Boolean': 'bool', 'DateTime': 'DateTime' } mapping = mapping.get(var_type.name.value, var_type.name.value) return mapping, nullable, var_type class AnonymousQueryError(Exception): def __init__(self): super().__init__('All queries must be named') class InvalidQueryError(Exception): def __init__(self, errors): self.errors = errors message = '\n'.join(str(err) for err in errors) super().__init__(message) class QueryParser: def __init__(self, schema: GraphQLSchema): self.schema = schema self.__jinja2_env = None def parse(self, query: str, should_validate: bool = True) -> ParsedQuery: document_ast = parse(query) operation = get_operation_ast(document_ast) if not operation.name: raise AnonymousQueryError() if should_validate: errors = validate(self.schema, document_ast) if errors: raise InvalidQueryError(errors) type_info = TypeInfo(self.schema) visitor = FieldToTypeMatcherVisitor(self.schema, type_info, query) visit(document_ast, TypeInfoVisitor(type_info, visitor)) result = visitor.parsed return result PK!lO))gql/renderer_dataclasses.pyfrom graphql import GraphQLSchema from gql.config import Config from gql.utils_codegen import CodeChunk from gql.query_parser import ParsedQuery, ParsedField, ParsedObject, ParsedEnum, ParsedOperation, ParsedVariableDefinition class DataclassesRenderer: def __init__(self, schema: GraphQLSchema, config: Config): self.schema = schema self.config = config def render(self, parsed_query: ParsedQuery): # We sort fragment nodes to be first and operations to be last because of dependecies buffer = CodeChunk() buffer.write('# AUTOGENERATED file. Do not Change!') buffer.write('from functools import partial') buffer.write('from typing import Any, Callable, Mapping, List') buffer.write('from enum import Enum') buffer.write('from dataclasses import dataclass, field') buffer.write('from dataclasses_json import dataclass_json') buffer.write('from gql.clients import Client, AsyncIOClient') buffer.write('') if self.config.custom_header: buffer.write_lines(self.config.custom_header.split('\n')) buffer.write('') self.__render_datetime_field(buffer) # Enums if parsed_query.enums: self.__render_enum_field(buffer) for enum in parsed_query.enums: self.__render_enum(buffer, enum) sorted_objects = sorted(parsed_query.objects, key=lambda obj: 1 if isinstance(obj, ParsedOperation) else 0) for obj in sorted_objects: if isinstance(obj, ParsedObject): self.__render_object(parsed_query, buffer, obj) elif isinstance(obj, ParsedOperation): self.__render_operation(parsed_query, buffer, obj) return str(buffer) @staticmethod def __render_enum_field(buffer: CodeChunk): with buffer.write_block('def enum_field(enum_type):'): with buffer.write_block('def encode_enum(value):'): buffer.write('return value.value') buffer.write('') with buffer.write_block('def decode_enum(t, value):'): buffer.write('return t(value)') buffer.write('') buffer.write("return field(metadata={'dataclasses_json': {'encoder': encode_enum, 'decoder': partial(decode_enum, enum_type)}})") buffer.write('') @staticmethod def __render_datetime_field(buffer: CodeChunk): buffer.write('') buffer.write('from datetime import datetime') buffer.write('from marshmallow import fields as marshmallow_fields') buffer.write("DATETIME_FIELD = field(metadata={'dataclasses_json': {'encoder': datetime.isoformat, 'decoder': datetime.fromisoformat, 'mm_field': marshmallow_fields.DateTime(format='iso')}})") buffer.write('') def __render_object(self, parsed_query: ParsedQuery, buffer: CodeChunk, obj: ParsedObject): class_parents = '' if not obj.parents else f'({", ".join(obj.parents)})' buffer.write('@dataclass_json') buffer.write('@dataclass') with buffer.write_block(f'class {obj.name}{class_parents}:'): # render child objects for child_object in obj.children: self.__render_object(parsed_query, buffer, child_object) # render fields sorted_fields = sorted(obj.fields, key=lambda f: 1 if f.nullable else 0) for field in sorted_fields: self.__render_field(parsed_query, buffer, field) # pass if not children or fields if not (obj.children or obj.fields): buffer.write('pass') buffer.write('') def __render_operation(self, parsed_query: ParsedQuery, buffer: CodeChunk, parsed_op: ParsedOperation): buffer.write('@dataclass_json') buffer.write('@dataclass') with buffer.write_block(f'class {parsed_op.name}:'): buffer.write('__QUERY__ = """') buffer.write(parsed_query.query) buffer.write('"""') buffer.write('') # Render children for child_object in parsed_op.children: self.__render_object(parsed_query, buffer, child_object) # operation fields buffer.write('') buffer.write(f'data: {parsed_op.name}Data = None') buffer.write('errors: Any = None') buffer.write('') # Execution functions if parsed_op.variables: vars_args = ', '.join([self.__render_variable_definition(var) for var in parsed_op.variables]) + ',' variables_dict = '{' + ', '.join(f'"{var.name}": {var.name}' for var in parsed_op.variables) + '}' else: vars_args = '' variables_dict = 'None' buffer.write('@classmethod') with buffer.write_block(f'def execute(cls, {vars_args} on_before_callback: Callable[[Mapping[str, str], Mapping[str, str]], None] = None):'): buffer.write(f'client = Client(\'{self.config.endpoint}\')') buffer.write(f'variables = {variables_dict}') buffer.write('response_text = client.call(cls.__QUERY__, variables=variables, on_before_callback=on_before_callback)') buffer.write('return cls.from_json(response_text)') buffer.write('') buffer.write('@classmethod') with buffer.write_block(f'async def execute_async(cls, {vars_args} on_before_callback: Callable[[Mapping[str, str], Mapping[str, str]], None] = None):'): buffer.write(f'client = AsyncIOClient(\'{self.config.endpoint}\')') buffer.write(f'variables = {variables_dict}') buffer.write(f'response_text = await client.call(cls.__QUERY__, variables=variables, on_before_callback=on_before_callback)') buffer.write(f'return cls.from_json(response_text)') buffer.write('') buffer.write('') @staticmethod def __render_variable_definition(var: ParsedVariableDefinition): if not var.nullable: return f'{var.name}: {var.type}' return f'{var.name}: {var.type} = {var.default_value or "None"}' @staticmethod def __render_field(parsed_query: ParsedQuery, buffer: CodeChunk, field: ParsedField): enum_names = [e.name for e in parsed_query.enums] is_enum = field.type in enum_names suffix = '' field_type = field.type if is_enum: suffix = f'= enum_field({field.type})' if field.type == 'DateTime': suffix = '= DATETIME_FIELD' field_type = 'datetime' if field.nullable: suffix = f'= {field.default_value}' buffer.write(f'{field.name}: {field_type} {suffix}') @staticmethod def __render_enum(buffer: CodeChunk, enum: ParsedEnum): with buffer.write_block(f'class {enum.name}(Enum):'): for value_name, value in enum.values.items(): if isinstance(value, str): value = f"'{value}'" buffer.write(f'{value_name} = {value}') buffer.write('') PK!(gql/utils_codegen.pyimport os SPACES = ' ' * 4 class CodeChunk: class Block: def __init__(self, codegen: 'CodeChunk'): self.gen = codegen def __enter__(self): self.gen.indent() return self.gen def __exit__(self, exc_type, exc_val, exc_tb): self.gen.unindent() def __init__(self): self.lines = [] self.level = 0 def indent(self): self.level += 1 def unindent(self): if self.level > 0: self.level -= 1 @property def indent_string(self): return self.level * SPACES def write(self, value: str, *args, **kwargs): value = self.indent_string + value if args or kwargs: value = value.format(*args, **kwargs) self.lines.append(value) def write_lines(self, lines): for line in lines: self.lines.append(self.indent_string + line) def block(self): return self.Block(self) def write_block(self, block_header: str, *args, **kwargs): self.write(block_header, *args, **kwargs) return self.block() def __str__(self): return os.linesep.join(self.lines) PK!g1gql/utils_schema.pyimport os import json import requests from graphql import get_introspection_query, build_client_schema def load_introspection_from_server(url): query = get_introspection_query() request = requests.post(url, json={'query': query}) if request.status_code == 200: return request.json()['data'] raise Exception(f'Query failed to run by returning code of {request.status_code}. {query}') def load_introspection_from_file(filename): with open(filename, 'r') as fin: return json.load(fin) def load_schema(uri): introspection = load_introspection_from_file(uri) if os.path.isfile(uri) else load_introspection_from_server(uri) return build_client_schema(introspection) PK!H'"#)gql_next-0.1.1.dist-info/entry_points.txtN+I/N.,()J/̱bL+ PK!HڽTUgql_next-0.1.1.dist-info/WHEEL A н#Z;/"d&F[xzw@Zpy3Fv]\fi4WZ^EgM_-]#0(q7PK!HIU05!gql_next-0.1.1.dist-info/METADATAWms6_qn&Ƣlxq.:v;$ǽ}:}}\zUYMQr(.eR}-͋Ar,]a禢wVtZhUyc)U3h|տ.ŅT@x?' mJVV,iKR"%O̔h\?MqZHT+(lPNښ7v["YYхf f6B_W\:y+SnYe0δ j3]?HNy,:gW 󘃫ˠ98O̸(yf ug%P'$t5-Tuerʿ{NWwـ/(|O弋\GOs-湙E0r5;?57xgfQ }+g?6K߸/\'(̓vL'k?!\eR:_ S2AQDUqfi%Ld>So<C~ӰMHiGq 1eryCsUݹkU^ujɇ!es](R՝YsE2GR>'E I'B׻լX:#WnfrxOfCKʬ.&^C/.X4%.JBUv"{fr) oLD/,*&R&mO2Zu9 DZS"t*x]'k!ٱP@P(B􍃛)tzF$d,Z)pZfAZ +X?o:2!;tGsL9Rp o9VX c(DPg$[㘴: ջO'7g݄ UI=AH tR-17ጼ>z_UMq=R7Iw"}1 1[F^-BYidNx #Ϯ0icmZܒ:Q3'4 wP[ ' 4(c46Ԧ)*ku)rKmT@G)ƫݬc>j4QS 7ĉ$no[HVffA( CTChl7Bo^:keO }K&jgM;;B4}jwk|k8bDZEͳ`94N܇6 ,|g[kvzƽXr`:ñe'IYIxA߭A'tk 7wQDi5hIN(f̐߃`=,rvj|e#ujQ86[4L"P2V)~OLMC2=7# P3(<}AaI\M0NH4 lVslpgxUId8U`$L2XZ!~0#o\.평,Xz18hݡ9#,A*}aBg/i?ʬcg:Pz4[-ζalo'wm}s]TpQ1̓?CΨAm@woDVPJ[֓+prj|ꪜxܔbr {paPK!DǙ gql/cli.pyPK!I/<<*gql/clients/__init__.pyPK!t##gql/clients/asyncio.pyPK!A^gql/clients/sync.pyPK!X gql/config.pyPK!:m/ gql/query_parser.pyPK!lO))|<gql/renderer_dataclasses.pyPK!(Xgql/utils_codegen.pyPK!g1]gql/utils_schema.pyPK!H'"#)`gql_next-0.1.1.dist-info/entry_points.txtPK!HڽTUagql_next-0.1.1.dist-info/WHEELPK!HIU05!agql_next-0.1.1.dist-info/METADATAPK!HhZ{hgql_next-0.1.1.dist-info/RECORDPK k