Skip to content

GraphQL schema

GraphQL schema is generated by passing all the operations (query/mutation/subscription) functions to apischema.graphql.graphql_schema.

Functions parameters and return types are then processed by apischema to generate the Query/Mutation/Subscription types with their resolvers/subscribers, which are then passed to graphql.GraphQLSchema.

In fact, graphql_schema is just a wrapper around graphql.GraphQLSchema (same parameters plus a few extras); it just uses apischema abstraction to build GraphQL object types directly from your code.

Operations metadata

GraphQL operations can be passed to graphql_schema either using simple functions or wrapping it into apischema.graphql.Query/apischema.graphql.Mutation/apischema.graphql.Subscription. These wrappers have the same parameters as apischema.graphql.resolver: alias, conversions, error_handler, order and schema (Subscription has an additional parameter).

from dataclasses import dataclass

from graphql import print_schema

from apischema.graphql import Query, graphql_schema, resolver


@dataclass
class Foo:
    @resolver
    async def bar(self, arg: int = 0) -> str:
        ...


async def get_foo() -> Foo:
    ...


schema = graphql_schema(query=[Query(get_foo, alias="foo", error_handler=None)])
schema_str = """\
type Query {
  foo: Foo
}

type Foo {
  bar(arg: Int! = 0): String!
}"""
assert print_schema(schema) == schema_str

camelCase

GraphQL use camelCase as a convention for resolvers; apischema follows this convention by automatically convert all resolver names (and their parameters) to camelCase. graphql_schema has an aliaser parameter if you want to use another case.

Type names

Schema types are named the same way they are in generated JSON schema: type name is used by default, and it can be overridden using apischema.type_name

from dataclasses import dataclass

from graphql import print_schema

from apischema import type_name
from apischema.graphql import graphql_schema


@type_name("Foo")
@dataclass
class FooFoo:
    bar: int


def foo() -> FooFoo | None:
    ...


schema = graphql_schema(query=[foo])
schema_str = """\
type Query {
  foo: Foo
}

type Foo {
  bar: Int!
}"""
assert print_schema(schema) == schema_str

Note

Type names can be distinguished between JSON schema and GraphQL schema using type_name named parameter. Indeed, type_name("foo") is equivalent to type_name(json_schema="foo", graphql="foo").

However, in GraphQL schema, unions must be named, so typing.Union used should be annotated with apischema.type_name. graphql_schema also provides a union_ref parameter which can be passed as a function to generate a type name from the union argument. Default union_ref is "Or".join meaning typing.Union[Foo, Bar] will result in union FooOrBar = Foo | Bar

from dataclasses import dataclass

from graphql import print_schema

from apischema.graphql import graphql_schema


@dataclass
class Foo:
    foo: int


@dataclass
class Bar:
    bar: int


def foo_or_bar() -> Foo | Bar:
    ...


# union_ref default value is made explicit here
schema = graphql_schema(query=[foo_or_bar], union_name="Or".join)
schema_str = """\
type Query {
  fooOrBar: FooOrBar!
}

union FooOrBar = Foo | Bar

type Foo {
  foo: Int!
}

type Bar {
  bar: Int!
}"""
assert print_schema(schema) == schema_str

Enum metadata

Contrary to dataclasses, Enum doesn't provide a way to set metadata for its members, especially description, but also deprecation reason. They can however be passed using enum_schemas parameter of graphql_schema.

from enum import Enum

from graphql import graphql_sync
from graphql.utilities import print_schema

from apischema import schema
from apischema.graphql import graphql_schema


class MyEnum(Enum):
    FOO = "FOO"
    BAR = "BAR"


def echo(enum: MyEnum) -> MyEnum:
    return enum


schema_ = graphql_schema(
    query=[echo], enum_schemas={MyEnum.FOO: schema(description="foo")}
)
schema_str = '''\
type Query {
  echo(enum: MyEnum!): MyEnum!
}

enum MyEnum {
  """foo"""
  FOO
  BAR
}'''
assert print_schema(schema_) == schema_str
assert graphql_sync(schema_, "{echo(enum: FOO)}").data == {"echo": "FOO"}

Additional types

apischema will only include in the schema the types annotating resolvers. However, it is possible to add other types by using the types parameter of graphql_schema. This is especially useful to add interface implementations where only interface is used in resolver types.

from dataclasses import dataclass

from graphql import print_schema

from apischema.graphql import graphql_schema, interface


@interface
@dataclass
class Bar:
    bar: int


@dataclass
class Foo(Bar):
    baz: str


def bar() -> Bar:
    ...


schema = graphql_schema(query=[bar], types=[Foo])
# type Foo would have not been present if Foo was not put in types
schema_str = """\
type Foo implements Bar {
  bar: Int!
  baz: String!
}

interface Bar {
  bar: Int!
}

type Query {
  bar: Bar!
}"""
assert print_schema(schema) == schema_str

Subscriptions

Subscriptions are particular operations which must return an AsyncIterable; this event generator can come with a dedicated resolver to post-process the event.

Event generator only

import asyncio
from typing import AsyncIterable

import graphql
from graphql import print_schema

from apischema.graphql import graphql_schema


def hello() -> str:
    return "world"


async def events() -> AsyncIterable[str]:
    yield "bonjour"
    yield "au revoir"


schema = graphql_schema(query=[hello], subscription=[events])
schema_str = """\
type Query {
  hello: String!
}

type Subscription {
  events: String!
}"""
assert print_schema(schema) == schema_str


async def test():
    subscription = await graphql.subscribe(
        schema, graphql.parse("subscription {events}")
    )
    assert [event.data async for event in subscription] == [
        {"events": "bonjour"},
        {"events": "au revoir"},
    ]


asyncio.run(test())

Note

Because there is no post-processing of generated event in a dedicated resolver, error_handler cannot be called, but it will still modify the type of the event.

Event generator + resolver

A resolver can be added by using the resolver parameter of Subscription. In this case, apischema will map subscription name, parameters and return type on the resolver instead of the event generator. It allows using the same event generator with several resolvers to create different subscriptions.

The first resolver argument will be the event yielded by the event generator.

import asyncio
from dataclasses import dataclass
from typing import AsyncIterable

import graphql
from graphql import print_schema

from apischema.graphql import Subscription, graphql_schema


def hello() -> str:
    return "world"


async def events() -> AsyncIterable[str]:
    yield "bonjour"
    yield "au revoir"


@dataclass
class Message:
    body: str


# Message can also be used directly as a function
schema = graphql_schema(
    query=[hello],
    subscription=[Subscription(events, alias="messageReceived", resolver=Message)],
)
schema_str = """\
type Query {
  hello: String!
}

type Subscription {
  messageReceived: Message!
}

type Message {
  body: String!
}"""
assert print_schema(schema) == schema_str


async def test():
    subscription = await graphql.subscribe(
        schema, graphql.parse("subscription {messageReceived {body}}")
    )
    assert [event.data async for event in subscription] == [
        {"messageReceived": {"body": "bonjour"}},
        {"messageReceived": {"body": "au revoir"}},
    ]


asyncio.run(test())