PK!9uwwagtail_graphql/__init__.py __version__ = '0.2.0' PK!;_+wagtail_graphql/actions.py# python import string from typing import Type, Set # django from django.utils.text import camel_case_to_spaces # graphene import graphene from graphene.types.generic import GenericScalar # graphene_django from graphene_django import DjangoObjectType # wagtail from wagtail.core.fields import StreamField from wagtail.core.models import Page as wagtailPage # wagtail forms from wagtail.contrib.forms.models import AbstractForm # wagtail settings from wagtail.contrib.settings.models import BaseSetting # app from .registry import registry from .permissions import with_page_permissions from .settings import url_prefix_for_site, RELAY # app types from .types import ( Page, Settings, FormError, FormField, ) def _add_form(cls: Type[AbstractForm], node: str, dict_params: dict) -> Type[graphene.Mutation]: if node in registry.forms: # pragma: no cover return registry.forms[node] registry.page_prefetch_fields.add(cls.__name__.lower()) dict_params['Meta'].interfaces += (Page,) dict_params['form_fields'] = graphene.List(FormField) def form_fields(self, _info): return list(FormField(name=field_.clean_name, field_type=field_.field_type, label=field_.label, required=field_.required, help_text=field_.help_text, choices=field_.choices, default_value=field_.default_value) for field_ in self.form_fields.all()) dict_params['resolve_form_fields'] = form_fields registry.pages[cls] = type(node, (DjangoObjectType,), dict_params) args = type("Arguments", (), {'values': GenericScalar(), "url": graphene.String(required=True)}) _node = node def mutate(_self, info, url, values): url_prefix = url_prefix_for_site(info) query = wagtailPage.objects.filter(url_path=url_prefix + url.rstrip('/') + '/') instance = with_page_permissions( info.context, query.specific() ).live().first() user = info.context.user # convert camelcase to dashes values = {camel_case_to_spaces(k).replace(' ', '-'): v for k, v in values.items()} form = instance.get_form(values, None, page=instance, user=user) if form.is_valid(): # form_submission instance.process_form_submission(form) return registry.forms[_node](result="OK") else: return registry.forms[_node](result="FAIL", errors=[FormError(*err) for err in form.errors.items()]) dict_params = { "Arguments": args, "mutate": mutate, "result": graphene.String(), "errors": graphene.List(FormError), } tp = type(node + "Mutation", (graphene.Mutation,), dict_params) # type: Type[graphene.Mutation] registry.forms[node] = tp return tp def _add_page(cls: Type[wagtailPage], node: str, dict_params: dict) -> Type[DjangoObjectType]: if cls in registry.pages: # pragma: no cover return registry.pages[cls] registry.page_prefetch_fields.add(cls.__name__.lower()) dict_params['Meta'].interfaces += (Page,) tp = type(node, (DjangoObjectType,), dict_params) # type: Type[DjangoObjectType] registry.pages[cls] = tp return tp def _add_setting(cls: Type[BaseSetting], node: str, dict_params: dict) -> Type[DjangoObjectType]: if not hasattr(cls, 'name'): # we always need a name field cls.name = cls.__name__ dict_params['Meta'].interfaces += (Settings,) tp = type(node, (DjangoObjectType,), dict_params) # type: Type[DjangoObjectType] registry.settings[node] = (tp, cls) return tp def _add_snippet(cls: type, node: str, dict_params: dict) -> Type[DjangoObjectType]: if cls in registry.snippets: # pragma: no cover return registry.snippets[cls] tp = type(node, (DjangoObjectType,), dict_params) # type: Type[DjangoObjectType] registry.snippets[cls] = tp registry.snippets_by_name[node] = tp return tp def _add_django_model(_cls: type, node: str, dict_params: dict) -> Type[DjangoObjectType]: if node in registry.django: # pragma: no cover return registry.django[node] tp = type(node, (DjangoObjectType,), dict_params) # type: Type[DjangoObjectType] registry.django[node] = tp return tp def _add_streamfields(cls: wagtailPage, node: str, dict_params: dict, app: str, prefix: str) -> None: from .types.streamfield import ( block_handler, stream_field_handler, ) for field in cls._meta.fields: if isinstance(field, StreamField): field_name = field.name stream_field_name = f"{node}{string.capwords(field_name, sep='_').replace('_', '')}" blocks = field.stream_block.child_blocks handlers = dict( (name, block_handler(block, app, prefix)) for name, block in blocks.items() ) f, resolve = stream_field_handler( stream_field_name, field_name, handlers ) dict_params.update({ field.name: f, "resolve_" + field.name: resolve }) def _register_model(registered: Set[type], cls: type, snippet: bool, app: str, prefix: str, override_name=None) -> None: if cls in registered: return prefix = prefix.format(app=string.capwords(app), cls=cls.__name__) node = override_name or prefix + cls.__name__ # dict parameters to create GraphQL type class Meta: model = cls interfaces = (graphene.relay.Node, ) if RELAY else tuple() dict_params = {'Meta': Meta} # add streamfield handlers _add_streamfields(cls, node, dict_params, app, prefix) if snippet: _add_snippet(cls, node, dict_params) elif issubclass(cls, AbstractForm): _add_form(cls, node, dict_params) elif issubclass(cls, wagtailPage): _add_page(cls, node, dict_params) elif issubclass(cls, BaseSetting): _add_setting(cls, node, dict_params) else: # Django Model _add_django_model(cls, node, dict_params) registered.add(cls) def add_app(app: str, prefix: str = '{app}') -> None: from django.contrib.contenttypes.models import ContentType from wagtail.snippets.models import get_snippet_models snippets = get_snippet_models() models = [mdl.model_class() for mdl in ContentType.objects.filter(app_label=app).all()] snippets = [s for s in snippets if s in models] to_register = [x for x in snippets + models if x is not None] registered: Set = set() # prefetch content_types ContentType.objects.get_for_models(*to_register) for cls in to_register: _register_model(registered, cls, cls in snippets, app, prefix) def add_apps_with_settings(settings: dict) -> None: apps = settings.get('APPS', []) for app in apps: prefixes = settings.get('PREFIX', {}) if isinstance(prefixes, str): prefix = prefixes # pragma: no cover else: prefix = prefixes.get(app, '{app}') add_app(app, prefix=prefix) if not apps: # pragma: no cover import logging logging.warning("No APPS specified for wagtail_graphql") def add_apps() -> None: from .settings import SETTINGS add_apps_with_settings(SETTINGS) # standard page _register_model(set(), wagtailPage, False, 'wagtailcore', '', override_name='BasePage') PK!)wagtail_graphql/apps.pyfrom django.apps import AppConfig # pragma: no cover class ApiConfig(AppConfig): # pragma: no cover name = 'wagtail_graphql' PK!M wagtail_graphql/permissions.py# python from typing import Any, Union # django from django.db.models import Q from django.contrib.auth.models import AnonymousUser # wagtail from wagtail.core.query import PageQuerySet from wagtail.core.models import PageViewRestriction, CollectionViewRestriction from wagtail.images.models import ImageQuerySet from wagtail.documents.models import DocumentQuerySet def with_page_permissions(request: Any, queryset: PageQuerySet) -> PageQuerySet: user = request.user # Filter by site if request.site: queryset = queryset.descendant_of(request.site.root_page, inclusive=True) else: # No sites configured return queryset.none() # pragma: no cover # Get live pages that are public and check groups and login permissions if user == AnonymousUser: queryset = queryset.public() elif user.is_superuser: pass else: current_user_groups = user.groups.all() q = Q() for restriction in PageViewRestriction.objects.all(): if (restriction.restriction_type == PageViewRestriction.PASSWORD) or \ (restriction.restriction_type == PageViewRestriction.LOGIN and not user.is_authenticated) or \ (restriction.restriction_type == PageViewRestriction.GROUPS and not any(group in current_user_groups for group in restriction.groups.all()) ): q = ~queryset.descendant_of_q(restriction.page, inclusive=True) queryset = queryset.filter(q).live() return queryset CollectionQSType = Union[ImageQuerySet, DocumentQuerySet] def with_collection_permissions(request: Any, queryset: CollectionQSType) -> CollectionQSType: user = request.user # Get live pages that are public and check groups and login permissions if user == AnonymousUser: queryset = queryset.public() elif user.is_superuser: pass else: current_user_groups = user.groups.all() q = Q() for restriction in CollectionViewRestriction.objects.all(): if (restriction.restriction_type == CollectionViewRestriction.PASSWORD) or \ (restriction.restriction_type == CollectionViewRestriction.LOGIN and not user.is_authenticated) or \ (restriction.restriction_type == CollectionViewRestriction.GROUPS and not any(group in current_user_groups for group in restriction.groups.all()) ): q &= ~Q(collection=restriction.collection) # q &= ~queryset.filter(collection) descendant_of_q(restriction.page, inclusive=True) queryset = queryset.filter(q) return queryset PK!߂wagtail_graphql/registry.py class RegistryItem(dict): @property def types(self) -> tuple: return tuple(self.values()) class Registry: _django = RegistryItem() _forms = RegistryItem() _pages = RegistryItem() _settings = RegistryItem() _snippets = RegistryItem() _snippets_by_name = RegistryItem() _streamfield_blocks = RegistryItem() _streamfield_scalar_blocks = RegistryItem() _page_prefetch = { 'content_type', 'owner', 'live_revision', 'page_ptr' } @property def blocks(self) -> RegistryItem: return self._streamfield_blocks @property def scalar_blocks(self) -> RegistryItem: return self._streamfield_scalar_blocks @property def django(self) -> RegistryItem: return self._django @property def forms(self) -> RegistryItem: return self._forms @property def pages(self) -> RegistryItem: return self._pages @property def settings(self) -> RegistryItem: return self._settings @property def snippets(self) -> RegistryItem: return self._snippets @property def snippets_by_name(self) -> RegistryItem: return self._snippets_by_name @property def rsnippets(self) -> RegistryItem: return RegistryItem((v, k) for k, v in self._snippets.items()) @property def page_prefetch_fields(self) -> set: return self._page_prefetch @property def models(self) -> dict: models: dict = {} models.update(self.pages) models.update(self.snippets) models.update(self.forms) models.update(self.django) models.update((k, v[0]) for k, v in self.settings.items()) models.update((k, v) for k, v in self.blocks.items() if not isinstance(v, tuple)) models.update(self.scalar_blocks.items()) return models @property def rmodels(self) -> dict: return dict((v, k) for k, v in self.models.items()) registry = Registry() PK!Hwagtail_graphql/relay.py# graphene import graphene # app from .settings import RELAY if RELAY: class RelayMixin: node = graphene.relay.Node.Field() else: class RelayMixin: # type: ignore pass PK!qpmmwagtail_graphql/schema.py# typings from typing import Any # noqa # django from django.utils.text import camel_case_to_spaces # graphql from graphql import ResolveInfo # graphene import graphene # graphene_django from graphene_django.converter import String # app from .relay import RelayMixin from .registry import registry from .actions import add_apps # add all the apps from the settings add_apps() # mixins from .types import ( # noqa: E402 AuthQueryMixin, LoginMutation, LogoutMutation, DocumentQueryMixin, ImageQueryMixin, InfoQueryMixin, MenusQueryMixin, PagesQueryMixin, SettingsQueryMixin, SnippetsQueryMixin, ) # api version GRAPHQL_API_FORMAT = (0, 2, 0) # mixins AuthQueryMixin_ = AuthQueryMixin() # type: Any DocumentQueryMixin_ = DocumentQueryMixin() # type: Any ImageQueryMixin_ = ImageQueryMixin() # type: Any InfoQueryMixin_ = InfoQueryMixin() # type: Any MenusQueryMixin_ = MenusQueryMixin() # type: Any PagesQueryMixin_ = PagesQueryMixin() # type: Any SettingsQueryMixin_ = SettingsQueryMixin() # type: Any SnippetsQueryMixin_ = SnippetsQueryMixin() # type: Any class Query(graphene.ObjectType, AuthQueryMixin_, DocumentQueryMixin_, ImageQueryMixin_, InfoQueryMixin_, MenusQueryMixin_, PagesQueryMixin_, SettingsQueryMixin_, SnippetsQueryMixin_, RelayMixin ): # API Version format = graphene.Field(String) def resolve_format(self, _info: ResolveInfo): return '%d.%d.%d' % GRAPHQL_API_FORMAT def mutation_parameters() -> dict: dict_params = { 'login': LoginMutation.Field(), 'logout': LogoutMutation.Field(), } dict_params.update((camel_case_to_spaces(n).replace(' ', '_'), mut.Field()) for n, mut in registry.forms.items()) return dict_params Mutations = type("Mutation", (graphene.ObjectType,), mutation_parameters() ) schema = graphene.Schema( query=Query, mutation=Mutations, types=list(registry.models.values()) ) PK!,wagtail_graphql/settings.py# django from django.conf import settings # graphql from graphql import ResolveInfo # settings if hasattr(settings, 'GRAPHQL_API'): SETTINGS = settings.GRAPHQL_API else: # pragma: no cover SETTINGS = {} URL_PREFIX = SETTINGS.get('URL_PREFIX', {}) LOAD_GENERIC_SCALARS = SETTINGS.get('GENERIC_SCALARS', True) RELAY = SETTINGS.get('RELAY', False) # wagtail settings try: from wagtail.contrib.settings.registry import registry as settings_registry except ImportError: # pragma: no cover settings_registry = None def url_prefix_for_site(info: ResolveInfo): hostname = info.context.site.hostname return URL_PREFIX.get( hostname, info.context.site.root_page.url_path.rstrip('/') ) PK! {TT!wagtail_graphql/types/__init__.pyfrom .core import ( Page, Site, User, # mixins InfoQueryMixin, PagesQueryMixin, ) from .auth import ( LoginMutation, LogoutMutation, # mixins AuthQueryMixin, ) from .documents import ( Document, # mixins DocumentQueryMixin, ) from .forms import FormField, FormError from .images import ( Image, # mixins ImageQueryMixin, ) from .settings import Settings, SettingsQueryMixin from .snippets import SnippetsQueryMixin # noinspection PyUnresolvedReferences from . import converters # noqa: F401 __all__ = [ # core 'Page', 'Settings', 'Site', 'User', # auth 'AuthQueryMixin', 'LoginMutation', 'LogoutMutation', # documents 'Document', # forms 'FormError', 'FormField', # images 'Image', # mixins 'DocumentQueryMixin', 'ImageQueryMixin', 'InfoQueryMixin', 'MenusQueryMixin', 'PagesQueryMixin', 'SettingsQueryMixin', 'SnippetsQueryMixin', ] # menus try: from django.conf import settings if 'wagtailmenus' not in settings.INSTALLED_APPS: raise ImportError() # pragma: no cover from .menus import MenusQueryMixin, Menu, MenuItem, SecondaryMenu, SecondaryMenuItem # noqa: F401 __all__.extend([ # menus 'Menu', 'MenuItem', 'SecondaryMenu', 'SecondaryMenuItem', ]) HAS_WAGTAILMENUS = True except ImportError: # pragma: no cover def MenusQueryMixin(): # type: ignore class _MenusQueryMixin: pass return _MenusQueryMixin HAS_WAGTAILMENUS = False PK!Uv_RRwagtail_graphql/types/auth.py# django from django.contrib.auth.models import User as wagtailUser, AnonymousUser # graphene import graphene from graphql.execution.base import ResolveInfo # app types from .core import User def AuthQueryMixin(): class Mixin: # User information user = graphene.Field(User) def resolve_user(self, info: ResolveInfo): user = info.context.user if isinstance(user, AnonymousUser): return wagtailUser(id='-1', username='anonymous') return user return Mixin class LoginMutation(graphene.Mutation): class Arguments: username = graphene.String(required=True) password = graphene.String(required=True) user = graphene.Field(User) def mutate(self, info, username, password): from django.contrib.auth import authenticate, login user = authenticate(info.context, username=username, password=password) if user is not None: login(info.context, user) else: user = wagtailUser(id='-1', username='anonymous') return LoginMutation(user=user) class LogoutMutation(graphene.Mutation): user = graphene.Field(User) def mutate(self, info): from django.contrib.auth import logout logout(info.context) return LogoutMutation(wagtailUser(id='-1', username='anonymous')) PK!rBh6ss#wagtail_graphql/types/converters.py# django from django.db import models # graphene_django from graphene import String from graphene_django.converter import convert_django_field @convert_django_field.register(models.DecimalField) @convert_django_field.register(models.DurationField) def convert_force_string(field, _registry=None): return String(description=field.help_text, required=not field.null) PK!cw w wagtail_graphql/types/core.py# typings from typing import cast # django from django.contrib.auth.models import User as wagtailUser from django.contrib.contenttypes.models import ContentType # graphql from graphql.execution.base import ResolveInfo from graphql.language.ast import InlineFragment # graphene import graphene # graphene_django from graphene_django import DjangoObjectType from graphene_django.converter import convert_django_field, String, List # wagtail from wagtail.core.models import Page as wagtailPage, Site as wagtailSite from taggit.managers import TaggableManager from wagtail.core.utils import camelcase_to_underscore # app from ..settings import url_prefix_for_site, RELAY from ..registry import registry from ..permissions import with_page_permissions class User(DjangoObjectType): class Meta: model = wagtailUser exclude_fields = ['password'] class Site(DjangoObjectType): class Meta: model = wagtailSite interface_cls: graphene.Interface = graphene.relay.Node if RELAY else graphene.Interface class Page(interface_cls): if not RELAY: # use opaque ids in Relay id = graphene.Int(required=True) title = graphene.String(required=True) url_path = graphene.String() content_type = graphene.String() slug = graphene.String(required=True) path = graphene.String() depth = graphene.Int() seoTitle = graphene.String() numchild = graphene.Int() revision = graphene.Int() first_published_at = graphene.DateTime() last_published_at = graphene.DateTime() latest_revision_created_at = graphene.DateTime() live = graphene.Boolean() go_live_at = graphene.DateTime() expire_at = graphene.DateTime() expired = graphene.Boolean() locked = graphene.Boolean() draft_title = graphene.String() has_unpublished_changes = graphene.Boolean() def resolve_content_type(self, _info: ResolveInfo): self.content_type = ContentType.objects.get_for_model(self) return self.content_type.app_label + '.' + self.content_type.model_class().__name__ @classmethod def resolve_type(cls, instance, info: ResolveInfo) -> 'Page': mdl = ContentType.objects.get_for_model(instance).model_class() try: model = registry.pages[mdl] except KeyError: # pragma: no cover raise ValueError("Model %s is not a registered GraphQL type" % mdl) return model def resolve_url_path(self, info: ResolveInfo) -> str: self.url_path = cast(str, self.url_path) url_prefix = url_prefix_for_site(info) url = self.url_path if not self.url_path.startswith(url_prefix) else self.url_path[len(url_prefix):] return url.rstrip('/') if RELAY: children = graphene.ConnectionField(lambda *x: PageConnection) else: children = graphene.List(lambda *x: Page) def resolve_children(self, info: ResolveInfo, **_kwargs): query = wagtailPage.objects.child_of(self) return with_page_permissions( info.context, query.specific() ).live().order_by('path').all() @convert_django_field.register(TaggableManager) def convert_field_to_string(field, _registry=None): return List(String, description=field.help_text, required=not field.null) def _resolve_preview(request, view): # pragma: no cover from django.http import QueryDict page = view.get_page() post_data, timestamp = request.session.get(view.session_key, (None, None)) if not isinstance(post_data, str): post_data = '' form = view.get_form(page, QueryDict(post_data)) if not form.is_valid(): raise ValueError("Invalid preview data") form.save(commit=False) return page if RELAY: class PageConnection(graphene.relay.Connection): class Meta: node = Page class Edge: pass def PagesQueryMixin(): # noqa: C901 class Mixin: if RELAY: pages = graphene.ConnectionField(PageConnection) else: pages = graphene.List(Page, parent=graphene.Int()) page = graphene.Field(Page, id=graphene.Int(), url=graphene.String(), revision=graphene.Int(), ) preview = graphene.Field(Page, id=graphene.Int(required=True), ) preview_add = graphene.Field(Page, app_name=graphene.String(), model_name=graphene.String(), parent=graphene.Int(required=True), ) def resolve_pages(self, info: ResolveInfo, parent: int = None, **_kwargs): query = wagtailPage.objects # prefetch specific type pages selections = set(camelcase_to_underscore(f.name.value) for f in info.field_asts[0].selection_set.selections if not isinstance(f, InlineFragment)) for pf in registry.page_prefetch_fields.intersection(selections): query = query.select_related(pf) if parent is not None: parent_page = wagtailPage.objects.filter(id=parent).first() if parent_page is None: raise ValueError(f'Page id={parent} not found.') query = query.child_of(parent_page) return with_page_permissions( info.context, query.specific() ).live().order_by('path').all() def resolve_page(self, info: ResolveInfo, id: int = None, url: str = None, revision: int = None): query = wagtailPage.objects if id is not None: query = query.filter(id=id) elif url is not None: url_prefix = url_prefix_for_site(info) query = query.filter(url_path=url_prefix + url.rstrip('/') + '/') else: # pragma: no cover raise ValueError("One of 'id' or 'url' must be specified") page = with_page_permissions( info.context, query.specific() ).live().first() if page is None: return None if revision is not None: if revision == -1: rev = page.get_latest_revision() else: rev = page.revisions.filter(id=revision).first() if not rev: raise ValueError("Revision %d doesn't exist" % revision) page = rev.as_page_object() page.revision = rev.id return page return page def resolve_preview(self, info: ResolveInfo, id: int): # pragma: no cover from wagtail.admin.views.pages import PreviewOnEdit request = info.context view = PreviewOnEdit(args=('%d' % id, ), request=request) return _resolve_preview(request, view) def resolve_preview_add(self, info: ResolveInfo, app_name: str = 'wagtailcore', model_name: str = 'page', parent: int = None): # pragma: no cover from wagtail.admin.views.pages import PreviewOnCreate request = info.context view = PreviewOnCreate(args=(app_name, model_name, str(parent)), request=request) page = _resolve_preview(request, view) page.id = 0 # force an id, since our schema assumes page.id is an Int! return page # Show in Menu show_in_menus = graphene.List(Page) def resolve_show_in_menus(self, info: ResolveInfo): return with_page_permissions( info.context, wagtailPage.objects.filter(show_in_menus=True) ).live().order_by('path') return Mixin def InfoQueryMixin(): class Mixin: # Root root = graphene.Field(Site) def resolve_root(self, info: ResolveInfo): user = info.context.user if user.is_superuser: return info.context.site else: return None return Mixin PK!3U"wagtail_graphql/types/documents.py# graphql from graphql.execution.base import ResolveInfo # graphene import graphene # graphene_django from graphene_django import DjangoObjectType # graphene_django_optimizer import graphene_django_optimizer as gql_optimizer # wagtail documents from wagtail.documents.models import Document as wagtailDocument # app from ..permissions import with_collection_permissions class Document(DjangoObjectType): class Meta: model = wagtailDocument url = graphene.String() filename = graphene.String() file_extension = graphene.String() def resolve_tags(self: wagtailDocument, _info: ResolveInfo): return self.tags.all() def DocumentQueryMixin(): class Mixin: documents = graphene.List(Document) document = graphene.Field(Document, id=graphene.Int(required=True)) def resolve_documents(self, info: ResolveInfo): return with_collection_permissions( info.context, gql_optimizer.query( wagtailDocument.objects.all(), info ) ) def resolve_document(self, info: ResolveInfo, id: int): doc = with_collection_permissions( info.context, gql_optimizer.query( wagtailDocument.objects.filter(id=id), info ) ).first() return doc return Mixin PK!_Lwagtail_graphql/types/forms.py# graphene import graphene # graphene_django from graphene_django.converter import String, Boolean class FormField(graphene.ObjectType): name = graphene.Field(String) field_type = graphene.Field(String) help_text = graphene.Field(String) required = graphene.Field(Boolean) choices = graphene.Field(String) default_value = graphene.Field(String) label = graphene.Field(String) class FormError(graphene.ObjectType): name = graphene.Field(String) errors = graphene.List(String) PK!j wagtail_graphql/types/images.py# graphql from graphql.execution.base import ResolveInfo # django from django.urls import reverse # graphene import graphene # graphene_django from graphene_django import DjangoObjectType from graphene_django.converter import convert_django_field # graphene_django_optimizer import graphene_django_optimizer as gql_optimizer # wagtail images from wagtail.images.models import Image as wagtailImage from wagtail.images.views.serve import generate_signature # app from ..permissions import with_collection_permissions @convert_django_field.register(wagtailImage) def convert_image(field, _registry=None): return Image(description=field.help_text, required=not field.null) # pragma: no cover class Rect(graphene.ObjectType): left = graphene.Int() top = graphene.Int() right = graphene.Int() bottom = graphene.Int() x = graphene.Int() y = graphene.Int() height = graphene.Int() width = graphene.Int() class Image(DjangoObjectType): class Meta: model = wagtailImage exclude_fields = [ 'focal_point_x', 'focal_point_y', 'focal_point_width', 'focal_point_height', ] has_focal_point = graphene.Boolean() focal_point = graphene.Field(Rect) url = graphene.String(rendition=graphene.String()) url_link = graphene.String(rendition=graphene.String()) def resolve_has_focal_point(self: wagtailImage, _info: ResolveInfo): return self.has_focal_point() def resolve_focal_point(self: wagtailImage, _info: ResolveInfo): return self.get_focal_point() def resolve_tags(self: wagtailImage, _info: ResolveInfo): return self.tags.all() def resolve_url(self: wagtailImage, _info: ResolveInfo, rendition: str = None): if not rendition: if not self.has_focal_point(): rendition = "original" else: fp = self.get_focal_point() rendition = 'fill-%dx%d-c100' % (fp.width, fp.height) return generate_image_url(self, rendition) def resolve_url_link(self: wagtailImage, _info: ResolveInfo, rendition: str = None): if not rendition: if not self.has_focal_point(): rendition = "original" else: fp = self.get_focal_point() rendition = 'fill-%dx%d-c100' % (fp.width, fp.height) return self.get_rendition(rendition).url def generate_image_url(image: wagtailImage, filter_spec: str) -> str: signature = generate_signature(image.pk, filter_spec) url = reverse('wagtailimages_serve', args=(signature, image.pk, filter_spec)) return url def ImageQueryMixin(): class Mixin: images = graphene.List(Image) image = graphene.Field(Image, id=graphene.Int(required=True)) def resolve_images(self, info: ResolveInfo): return with_collection_permissions( info.context, gql_optimizer.query( wagtailImage.objects.all(), info ) ) def resolve_image(self, info: ResolveInfo, id: int): image = with_collection_permissions( info.context, gql_optimizer.query( wagtailImage.objects.filter(id=id), info ) ).first() return image return Mixin PK!iiwagtail_graphql/types/menus.py# python from typing import List # graphql from graphql import ResolveInfo # graphene_django import graphene from graphene_django import DjangoObjectType # wagtailmenus from wagtailmenus.models import FlatMenu, FlatMenuItem, MainMenu, MainMenuItem class MenuItem(DjangoObjectType): class Meta: model = MainMenuItem class Menu(DjangoObjectType): class Meta: model = MainMenu only_fields = ['max_levels', 'menu_items'] class SecondaryMenuItem(DjangoObjectType): class Meta: model = FlatMenuItem class SecondaryMenu(DjangoObjectType): class Meta: model = FlatMenu only_fields = ['title', 'handle', 'heading', 'max_levels', 'menu_items'] def MenusQueryMixin(): class Mixin: main_menu = graphene.List(Menu) secondary_menu = graphene.Field(SecondaryMenu, handle=graphene.String(required=True)) secondary_menus = graphene.List(SecondaryMenu) def resolve_main_menu(self, _info: ResolveInfo) -> List[MainMenu]: return MainMenu.objects.all() def resolve_secondary_menus(self, _info: ResolveInfo) -> List[FlatMenu]: return FlatMenu.objects.all() def resolve_secondary_menu(self, _info, handle: ResolveInfo) -> FlatMenu: return FlatMenu.objects.filter(handle=handle).first() return Mixin PK!:tP__!wagtail_graphql/types/settings.py# graphql from graphql.execution.base import ResolveInfo # graphene import graphene # graphene_django from graphene_django.converter import String # app from ..registry import registry from ..settings import settings_registry class Settings(graphene.Interface): __typename = graphene.Field(String) def SettingsQueryMixin(): class Mixin: if settings_registry: settings = graphene.Field(Settings, name=graphene.String(required=True)) def resolve_settings(self, _info: ResolveInfo, name): try: result = registry.settings[name][1].objects.first() except KeyError: raise ValueError(f"Settings '{name}' not found.") return result else: # pragma: no cover pass return Mixin PK!p|YN!wagtail_graphql/types/snippets.py# django from django.db import models # graphql from graphql.execution.base import ResolveInfo # graphene import graphene # app from ..registry import registry def SnippetsQueryMixin(): class Mixin: if registry.snippets: class Snippet(graphene.types.union.Union): class Meta: types = registry.snippets.types snippets = graphene.List(Snippet, typename=graphene.String(required=True)) def resolve_snippets(self, _info: ResolveInfo, typename: str) -> models.Model: node = registry.snippets_by_name[typename] cls = node._meta.model return cls.objects.all() else: # pragma: no cover pass return Mixin PK!>.=.=$wagtail_graphql/types/streamfield.py# python from typing import Tuple, Callable import datetime # graphql from graphql import GraphQLScalarType from graphql.execution.base import ResolveInfo # graphene import graphene from graphene.utils.str_converters import to_snake_case from graphene.types.generic import GenericScalar from graphene.types import Scalar # graphene_django from graphene_django.converter import convert_django_field, List # dateutil from dateutil.parser import parse as dtparse # wagtail import wagtail.core.blocks import wagtail.images.blocks import wagtail.snippets.blocks from wagtail.core.blocks import Block, ListBlock, StructBlock from wagtail.core.fields import StreamField # app from ..registry import registry from .. import settings # app types from .core import Page, wagtailPage from .images import Image, wagtailImage # types StreamFieldHandlerType = Tuple[graphene.List, Callable[[StreamField, ResolveInfo], list]] @convert_django_field.register(StreamField) def convert_stream_field(field, _registry=None): return Scalar(description=field.help_text, required=not field.null) def _scalar_block(graphene_type): tp = registry.scalar_blocks.get(graphene_type) if not tp: node = '%sBlock' % graphene_type tp = type(node, (graphene.ObjectType,), { 'value': graphene.Field(graphene_type), 'field': graphene.Field(graphene.String), }) registry.scalar_blocks[graphene_type] = tp return tp def _resolve_scalar(key, type_): type_str = str(type_) if type_str == 'DateBlock': def resolve(self, _info: ResolveInfo): return type_(value=dtparse(self), field=key) elif type_str == 'DateTimeBlock': def resolve(self, _info: ResolveInfo): return type_(value=dtparse(self), field=key) elif type_str == 'TimeBlock': def resolve(self, _info: ResolveInfo): return type_(value=datetime.time.fromisoformat(self), field=key) else: def resolve(self, _info: ResolveInfo): return type_(value=self, field=key) return resolve def _page_block(): tp = registry.scalar_blocks.get(Page) if not tp: node = 'PageBlock' tp = type(node, (graphene.ObjectType,), { 'value': graphene.Field(Page), 'field': graphene.Field(graphene.String), }) registry.scalar_blocks[Page] = tp return tp def _resolve_page_block(key, type_): def resolve(self, info: ResolveInfo): return type_(value=_resolve_page(self, info), field=key) return resolve def _image_block(): tp = registry.scalar_blocks.get(Image) if not tp: node = 'ImageBlock' tp = type(node, (graphene.ObjectType,), { 'value': graphene.Field(Image), 'field': graphene.Field(graphene.String), }) registry.scalar_blocks[Image] = tp return tp def _resolve_image_block(key, type_): def resolve(self, info: ResolveInfo): return type_(value=_resolve_image(self, info), field=key) return resolve def _snippet_block(typ): tp = registry.scalar_blocks.get(typ) if not tp: node = '%sBlock' % typ tp = type(node, (graphene.ObjectType,), { 'value': graphene.Field(typ), 'field': graphene.Field(graphene.String), }) registry.scalar_blocks[typ] = tp return tp def _resolve_snippet_block(key, type_, snippet_type): def resolve(self, info: ResolveInfo): info.return_type = snippet_type return type_(value=_resolve_snippet(self, info), field=key) return resolve def _list_block(typ): tp = registry.scalar_blocks.get((List, typ)) if not tp: node = '%sListBlock' % typ tp = type(node, (graphene.ObjectType,), { 'value': graphene.List(typ), 'field': graphene.Field(graphene.String), }) registry.scalar_blocks[(List, typ)] = tp return tp def _resolve_list_block_scalar(key, type_, of_type): type_str = str(of_type) if type_str == 'Date' or type_str == 'DateTime': def resolve(self, _info: ResolveInfo): return type_(value=list(dtparse(s) for s in self), field=key) elif type_str == 'Time': def resolve(self, _info: ResolveInfo): return type_(value=list(datetime.time.fromisoformat(s) for s in self), field=key) else: def resolve(self, _info: ResolveInfo): return type_(value=list(s for s in self), field=key) return resolve def _resolve_list_block(key, type_, of_type): if issubclass(of_type, Scalar): resolve = _resolve_list_block_scalar(key, type_, of_type) elif of_type == Image: def resolve(self, info: ResolveInfo): return type_(value=list(_resolve_image(s, info) for s in self), field=key) elif of_type == Page: def resolve(self, info: ResolveInfo): return type_(value=list(_resolve_page(s, info) for s in self), field=key) elif of_type in registry.snippets.values(): def resolve(self, info: ResolveInfo): info.return_type = of_type return type_(value=list(_resolve_snippet(s, info) for s in self), field=key) else: def resolve(self, info: ResolveInfo): info.return_type = of_type return type_(value=list(of_type(**s) for s in self), field=key) return resolve def _create_root_blocks(block_type_handlers: dict): for k, t in block_type_handlers.items(): if not isinstance(t, tuple) and issubclass(t, Scalar): typ = _scalar_block(t) block_type_handlers[k] = typ, _resolve_scalar(k, typ) elif isinstance(t, tuple) and isinstance(t[0], List): typ = _list_block(t[0].of_type) block_type_handlers[k] = typ, _resolve_list_block(k, typ, t[0].of_type) elif isinstance(t, tuple) and issubclass(t[0], Page): typ = _page_block() block_type_handlers[k] = typ, _resolve_page_block(k, typ) elif isinstance(t, tuple) and issubclass(t[0], Image): typ = _image_block() block_type_handlers[k] = typ, _resolve_image_block(k, typ) elif isinstance(t, tuple) and t[0] in registry.snippets.values(): typ = _snippet_block(t[0]) block_type_handlers[k] = typ, _resolve_snippet_block(k, typ, t[0]) def convert_block(block, block_type_handlers: dict, info: ResolveInfo, is_lazy=True): if is_lazy: block_type = block.get('type') value = block.get('value') else: block_type, value = block[:2] if block_type in block_type_handlers: handler = block_type_handlers[block_type] if isinstance(handler, tuple): tp, resolver = handler return resolver(value, info) else: if isinstance(value, dict): return handler(**value) else: raise NotImplementedError() # pragma: no cover else: raise NotImplementedError() # pragma: no cover def _resolve_type(self, _info: ResolveInfo): return self.__class__ def stream_field_handler(stream_field_name: str, field_name: str, block_type_handlers: dict) -> StreamFieldHandlerType: # add Generic Scalars (default) if settings.LOAD_GENERIC_SCALARS: _scalar_block(GenericScalar) # Unions must reference NamedTypes, so for scalar types we need to create a new type to # encapsulate scalars, page links, images, snippets _create_root_blocks(block_type_handlers) types_ = list(block_type_handlers.values()) for i, t in enumerate(types_): if isinstance(t, tuple): types_[i] = t[0] class Meta: types = tuple(set(types_)) stream_field_type = type( stream_field_name + "Type", (graphene.Union, ), { 'Meta': Meta, 'resolve_type': _resolve_type } ) def resolve_field(self, info: ResolveInfo): field = getattr(self, field_name) return [convert_block(block, block_type_handlers, info, field.is_lazy) for block in field.stream_data] return graphene.List(stream_field_type), resolve_field def _is_compound_block(block): return isinstance(block, StructBlock) def _is_list_block(block): return isinstance(block, ListBlock) def _is_custom_type(block): return hasattr(block, "__graphql_type__") def _add_handler_resolves(dict_params): to_add = {} for k, v in dict_params.items(): if k == 'field': # pragma: no cover raise ValueError("StructBlocks cannot have fields named 'field'") if isinstance(v, tuple): val = v[0] to_add['resolve_' + k] = v[1] elif issubclass(v, (graphene.types.DateTime, graphene.types.Date)): val = v to_add['resolve_' + k] = _resolve_datetime elif issubclass(v, graphene.types.Time): val = v to_add['resolve_' + k] = _resolve_time elif not issubclass(v, Scalar): val = v to_add['resolve_' + k] = _resolve else: val = v dict_params[k] = graphene.Field(val) dict_params.update(to_add) def block_handler(block: Block, app, prefix=''): cls = block.__class__ handler = registry.blocks.get(cls) if handler is None: if _is_custom_type(block): target_block_type = block.__graphql_type__() this_handler = block_handler(target_block_type, app, prefix) if isinstance(this_handler, tuple): raise NotImplementedError() if hasattr(block, '__graphql_resolve__'): resolver = _resolve_custom(block, this_handler) elif issubclass(target_block_type, Scalar): resolver = _resolve_generic_scalar else: raise TypeError("Non Scalar custom types need an explicit __graphql_resolve__ method.") handler = (lambda x: this_handler, resolver) elif _is_compound_block(block): node = prefix + cls.__name__ dict_params = dict( (n, block_handler(block_type, app, prefix)) for n, block_type in block.child_blocks.items() ) _add_handler_resolves(dict_params) dict_params.update({ # add the field name 'field': graphene.Field(graphene.String), 'resolve_field': lambda *x: block.name, }) tp = type(node, (graphene.ObjectType,), dict_params) handler = tp registry.blocks[cls] = handler elif _is_list_block(block): this_handler = block_handler(block.child_block, app, prefix) if isinstance(this_handler, tuple): handler = List(this_handler[0]), _resolve_list(*this_handler) else: handler = List(this_handler), _resolve_simple_list else: handler = GenericScalar if cls == wagtail.snippets.blocks.SnippetChooserBlock: handler = (handler[0](block), handler[1]) # type: ignore return handler def _snippet_handler(block): tp = registry.snippets[block.target_model] return tp def _resolve_snippet(self, info: ResolveInfo): if self is None: # pragma: no cover return None field = to_snake_case(info.field_name) id_ = self if isinstance(self, int) else getattr(self, field) if hasattr(info.return_type, 'graphene_type'): cls = info.return_type.graphene_type._meta.model else: cls = info.return_type._meta.model obj = cls.objects.filter(id=id_).first() return obj def _resolve_image(self, info: ResolveInfo): if self is None: # pragma: no cover return None field = to_snake_case(info.field_name) id_ = self if isinstance(self, int) else getattr(self, field) return wagtailImage.objects.filter(id=id_).first() def _resolve_page(self, info: ResolveInfo): if self is None: # pragma: no cover return None field = to_snake_case(info.field_name) id_ = self if isinstance(self, int) else getattr(self, field) return wagtailPage.objects.filter(id=id_).specific().first() def _resolve(self, info: ResolveInfo): if self is None: # pragma: no cover return None field = to_snake_case(info.field_name) data = getattr(self, field) cls = info.return_type return cls.graphene_type(**data) def _resolve_datetime(self, info: ResolveInfo): if self is None: # pragma: no cover return None field = to_snake_case(info.field_name) data = getattr(self, field) return dtparse(data) if data else None def _resolve_time(self, info: ResolveInfo): if self is None: # pragma: no cover return None field = to_snake_case(info.field_name) data = getattr(self, field) return datetime.time.fromisoformat(data) if data else None def _resolve_custom(block, hdl): def _inner(self, info: ResolveInfo): if self is None: # pragma: no cover return None cls = info.return_type if isinstance(self, dict): data = self else: data = getattr(self, info.field_name) value = block.__graphql_resolve__(data, info) if hasattr(cls, "serialize"): return cls.serialize(value) return hdl(**value) return _inner def _resolve_generic_scalar(self, info: ResolveInfo): if self is None: # pragma: no cover return None data = getattr(self, info.field_name) cls = info.return_type return cls.serialize(data) def _resolve_simple_list(self, info: ResolveInfo): if self is None: # pragma: no cover return None field = to_snake_case(info.field_name) data = getattr(self, field) cls = info.return_type.of_type if isinstance(cls, (Scalar, GraphQLScalarType)): return list(d for d in data) return list(cls(**d) for d in data) def _resolve_list(tp, inner_resolver): def resolve(self, info: ResolveInfo): if self is None: # pragma: no cover return None field = to_snake_case(info.field_name) ids = getattr(self, field) info.return_type = tp return list(inner_resolver(i, info) for i in ids) return resolve registry.blocks.update({ # choosers wagtail.images.blocks.ImageChooserBlock: (Image, _resolve_image), wagtail.core.blocks.PageChooserBlock: (Page, _resolve_page), wagtail.snippets.blocks.SnippetChooserBlock: (_snippet_handler, _resolve_snippet), # standard fields wagtail.core.blocks.CharBlock: graphene.types.String, wagtail.core.blocks.URLBlock: graphene.types.String, wagtail.core.blocks.DateBlock: graphene.types.Date, wagtail.core.blocks.DateTimeBlock: graphene.types.DateTime, wagtail.core.blocks.BooleanBlock: graphene.types.Boolean, wagtail.core.blocks.IntegerBlock: graphene.types.Int, wagtail.core.blocks.FloatBlock: graphene.types.Float, wagtail.core.blocks.DecimalBlock: graphene.types.String, wagtail.core.blocks.TextBlock: graphene.types.String, wagtail.core.blocks.TimeBlock: graphene.types.Time, wagtail.core.blocks.RichTextBlock: graphene.types.String, wagtail.core.blocks.RawHTMLBlock: graphene.types.String, wagtail.core.blocks.BlockQuoteBlock: graphene.types.String, wagtail.core.blocks.ChoiceBlock: graphene.types.String, wagtail.core.blocks.RegexBlock: graphene.types.String, wagtail.core.blocks.EmailBlock: graphene.types.String, wagtail.core.blocks.StaticBlock: graphene.types.String, }) PK!X//'wagtail_graphql-0.3.0.dist-info/LICENSEMIT License Copyright (c) 2018 Tiago Requeijo Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. PK!H|SU%wagtail_graphql-0.3.0.dist-info/WHEELA н#J;/"d&F[xjxڠ5 G7#hi"h4?h>0k:baPn^PK!H m3.(wagtail_graphql-0.3.0.dist-info/METADATAXms۸_;H\Ҍ.ۉNθ9 "!1I0hYgPo4"\.v}v[Uh#U5`^bf|j,Sk-Dvx7lʒW5ƪ[3e圙N+f316Ҋ*ES̭ I8NUXlr.SQl0!33pzͭǍ͕+ɧ}_!%dXq&>7QD{1 ~{s9l0`?_q]ku'2z-ZϲՔj *$-_ϕ_={"H G<X),~"0QQ )stSvCFVB.ޛGvα>P* 8+re+N?2Ael˽ޢ8|XdjoUeEeWȵ0n35,pUTR%RU%Yfɫݗ?{_nr!=ݭ﷎ZM JOB:9p@tKîGu&rARn]fcC]傩1%V %kmHe3 ~lǐ1)"VFuɒLLѰb5ISpcҲ¿Vٔ$j"hnRh8;q m(~Ef f ) 5au^cI[!t6'vb3hdrQQ-f!EȬdN;FgY)AMߗvn-]>ťw_Cv1v7`7| ˴s϶( i9ӞV呟s.3ifF>_?x:ri~9Ǒ#ɔh2Aw-5j,$@*QFa\ŚZr2wbyp&xŧӳG(%@hk8d[?]UX%L7A/ \\ !^}3x< ߢh%K hr+9f{@]RiWƲ 3 .h NB;߂e  lK!scúFrDS/R:X39am~7Q=a{^օ ~B[o\DH2?X$=l$0v<'qҬ9 oxMH5EϤ)#,kj'߶;)fx;7z>3s=*yZZmTwj_.*@]=EB6qjҵEF tip@/kQCܢ^@6Yvt7ɂWmh!+;nnnslKEs$ 1x'k bmaܦ9+"Jowָ~7&P +I'NU0^p9&P:61K d}18!Fh:|/9zߒ/2?< 9 lX>=l qt$ķMaeEV%\n"+F@ʥS- HДëyL,z+sic0Y.FoZo)nqV 2p\ D@ؽ80Uon}mgAXqFwO&=RR'0?RZ6tLg\_DZ^Q;wP5/k#vѽPܳQN i4TrBe;!y{3i^62i4_eO:u?9fC0s/υYvL&^ <EHRԎ r[]RrYQ`–u/؉J]q-~y&E=K6m@OHUU=&=1{Z} 79]1trXP[55+^oà^YwUX"0!YV|CxǮ>~(e[čm&8@|AylL M Dh*,QPK!H 2,&wagtail_graphql-0.3.0.dist-info/RECORDuɲXy~ o5ATZQחYYa&9qg)y={&-Ȑ1){< ʂ ާ˟5.\"8cK!fxӸiCV8Ewn2B nA\p%8rMvݻ'+[fQ/"=37CGpnPX2џ·ᯔPhfx*iȾvBSO囙N؉I ʭ>N˛{=M4a:ZSFnYR9Bf xx'X0ms`;^|*WA_ 0k2͍L}u]7*ARw# "fXK`Ԩ :n{^?b:kيSd G7-*=_bUE'm_@ZT:79Eji11=_Aa5A9GѸAZX69_i*EN]MA2[} a|gC[{Mix8▝vtNahKB~ӌƎf"0SBe5zzw_B",)]nQ|Oy7qhezk<-ם!aAq` J.ƁY)S蕂6u u JuA6~Eɛ~?jU'NN-bdq!RB;O]d;{IR#lPA#N4e~T>-B 995v-[lM"t}c`bcβʘt?DVզWnK ?PK!9uwwagtail_graphql/__init__.pyPK!;_+Pwagtail_graphql/actions.pyPK!)2wagtail_graphql/apps.pyPK!M wagtail_graphql/permissions.pyPK!߂)wagtail_graphql/registry.pyPK!H1wagtail_graphql/relay.pyPK!qpmm2wagtail_graphql/schema.pyPK!,j;wagtail_graphql/settings.pyPK! {TT!y>wagtail_graphql/types/__init__.pyPK!Uv_RR Ewagtail_graphql/types/auth.pyPK!rBh6ss#Jwagtail_graphql/types/converters.pyPK!cw w MLwagtail_graphql/types/core.pyPK!3U"lwagtail_graphql/types/documents.pyPK!_Lswagtail_graphql/types/forms.pyPK!j @uwagtail_graphql/types/images.pyPK!iiwagtail_graphql/types/menus.pyPK!:tP__!wagtail_graphql/types/settings.pyPK!p|YN!Qwagtail_graphql/types/snippets.pyPK!>.=.=$wagtail_graphql/types/streamfield.pyPK!X//'wagtail_graphql-0.3.0.dist-info/LICENSEPK!H|SU%wagtail_graphql-0.3.0.dist-info/WHEELPK!H m3.($wagtail_graphql-0.3.0.dist-info/METADATAPK!H 2,&wagtail_graphql-0.3.0.dist-info/RECORDPK