GraphQL schema¶
GraphQL schema is generated by passing all the operations (query/mutation/subscription) functions to apischema.graphql.graphql_schema
.
Functions parameters and return types are then processed by apischema to generate the Query
/Mutation
/Subscription
types with their resolvers/subscribers, which are then passed to graphql.GraphQLSchema
.
In fact, graphql_schema
is just a wrapper around graphql.GraphQLSchema
(same parameters plus a few extras); it just uses apischema abstraction to build GraphQL
object types directly from your code.
Operations metadata¶
GraphQL operations can be passed to graphql_schema
either using simple functions or wrapping it into apischema.graphql.Query
/apischema.graphql.Mutation
/apischema.graphql.Subscription
. These wrappers have the same parameters as apischema.graphql.resolver
: alias
, conversions
, error_handler
, order
and schema
(Subscription
has an additional parameter).
from dataclasses import dataclass
from graphql import print_schema
from apischema.graphql import Query, graphql_schema, resolver
@dataclass
class Foo:
@resolver
async def bar(self, arg: int = 0) -> str:
...
async def get_foo() -> Foo:
...
schema = graphql_schema(query=[Query(get_foo, alias="foo", error_handler=None)])
schema_str = """\
type Query {
foo: Foo
}
type Foo {
bar(arg: Int! = 0): String!
}"""
assert print_schema(schema) == schema_str
camelCase¶
GraphQL use camelCase as a convention for resolvers; apischema follows this convention by automatically convert all resolver names (and their parameters) to camelCase. graphql_schema
has an aliaser
parameter if you want to use another case.
Type names¶
Schema types are named the same way they are in generated JSON schema: type name is used by default, and it can be overridden using apischema.type_name
from dataclasses import dataclass
from graphql import print_schema
from apischema import type_name
from apischema.graphql import graphql_schema
@type_name("Foo")
@dataclass
class FooFoo:
bar: int
def foo() -> FooFoo | None:
...
schema = graphql_schema(query=[foo])
schema_str = """\
type Query {
foo: Foo
}
type Foo {
bar: Int!
}"""
assert print_schema(schema) == schema_str
Note
Type names can be distinguished between JSON schema and GraphQL schema using type_name
named parameter. Indeed, type_name("foo")
is equivalent to type_name(json_schema="foo", graphql="foo")
.
However, in GraphQL schema, unions must be named, so typing.Union
used should be annotated with apischema.type_name
. graphql_schema
also provides a union_ref
parameter which can be passed as a function to generate a type name from the union argument. Default union_ref
is "Or".join
meaning typing.Union[Foo, Bar]
will result in union FooOrBar = Foo | Bar
from dataclasses import dataclass
from graphql import print_schema
from apischema.graphql import graphql_schema
@dataclass
class Foo:
foo: int
@dataclass
class Bar:
bar: int
def foo_or_bar() -> Foo | Bar:
...
# union_ref default value is made explicit here
schema = graphql_schema(query=[foo_or_bar], union_name="Or".join)
schema_str = """\
type Query {
fooOrBar: FooOrBar!
}
union FooOrBar = Foo | Bar
type Foo {
foo: Int!
}
type Bar {
bar: Int!
}"""
assert print_schema(schema) == schema_str
Enum
metadata¶
Contrary to dataclasses, Enum
doesn't provide a way to set metadata for its members, especially description, but also deprecation reason. They can however be passed using enum_schemas
parameter of graphql_schema
.
from enum import Enum
from graphql import graphql_sync
from graphql.utilities import print_schema
from apischema import schema
from apischema.graphql import graphql_schema
class MyEnum(Enum):
FOO = "FOO"
BAR = "BAR"
def echo(enum: MyEnum) -> MyEnum:
return enum
schema_ = graphql_schema(
query=[echo], enum_schemas={MyEnum.FOO: schema(description="foo")}
)
schema_str = '''\
type Query {
echo(enum: MyEnum!): MyEnum!
}
enum MyEnum {
"""foo"""
FOO
BAR
}'''
assert print_schema(schema_) == schema_str
assert graphql_sync(schema_, "{echo(enum: FOO)}").data == {"echo": "FOO"}
Additional types¶
apischema will only include in the schema the types annotating resolvers. However, it is possible to add other types by using the types
parameter of graphql_schema
. This is especially useful to add interface implementations where only interface is used in resolver types.
from dataclasses import dataclass
from graphql import print_schema
from apischema.graphql import graphql_schema, interface
@interface
@dataclass
class Bar:
bar: int
@dataclass
class Foo(Bar):
baz: str
def bar() -> Bar:
...
schema = graphql_schema(query=[bar], types=[Foo])
# type Foo would have not been present if Foo was not put in types
schema_str = """\
type Foo implements Bar {
bar: Int!
baz: String!
}
interface Bar {
bar: Int!
}
type Query {
bar: Bar!
}"""
assert print_schema(schema) == schema_str
Subscriptions¶
Subscriptions are particular operations which must return an AsyncIterable
; this event generator can come with a dedicated resolver to post-process the event.
Event generator only¶
import asyncio
from typing import AsyncIterable
import graphql
from graphql import print_schema
from apischema.graphql import graphql_schema
def hello() -> str:
return "world"
async def events() -> AsyncIterable[str]:
yield "bonjour"
yield "au revoir"
schema = graphql_schema(query=[hello], subscription=[events])
schema_str = """\
type Query {
hello: String!
}
type Subscription {
events: String!
}"""
assert print_schema(schema) == schema_str
async def test():
subscription = await graphql.subscribe(
schema, graphql.parse("subscription {events}")
)
assert [event.data async for event in subscription] == [
{"events": "bonjour"},
{"events": "au revoir"},
]
asyncio.run(test())
Note
Because there is no post-processing of generated event in a dedicated resolver, error_handler
cannot be called, but it will still modify the type of the event.
Event generator + resolver¶
A resolver can be added by using the resolver
parameter of Subscription
. In this case, apischema will map subscription name, parameters and return type on the resolver instead of the event generator. It allows using the same event generator with several resolvers to create different subscriptions.
The first resolver argument will be the event yielded by the event generator.
import asyncio
from dataclasses import dataclass
from typing import AsyncIterable
import graphql
from graphql import print_schema
from apischema.graphql import Subscription, graphql_schema
def hello() -> str:
return "world"
async def events() -> AsyncIterable[str]:
yield "bonjour"
yield "au revoir"
@dataclass
class Message:
body: str
# Message can also be used directly as a function
schema = graphql_schema(
query=[hello],
subscription=[Subscription(events, alias="messageReceived", resolver=Message)],
)
schema_str = """\
type Query {
hello: String!
}
type Subscription {
messageReceived: Message!
}
type Message {
body: String!
}"""
assert print_schema(schema) == schema_str
async def test():
subscription = await graphql.subscribe(
schema, graphql.parse("subscription {messageReceived {body}}")
)
assert [event.data async for event in subscription] == [
{"messageReceived": {"body": "bonjour"}},
{"messageReceived": {"body": "au revoir"}},
]
asyncio.run(test())