Subscription
Subscription is the third operation type available in GraphQL. It brings the event-based subscriptions mindset to the engine.
It's up to you to implement the event technology you want, like Google Pub/Sub, Nats, Redis... in our example, we decided to focus purely on the engine part of the feature.
Engine
: How to execute a subscription?
The engine is responsible for executing both the Query
/Mutation
s and the Subscription
's. The first ones are executed by the execute
method, where Subscription
, is executed by the subscribe
method.
The parameters that are available on the subscribe
method are:
query
(Union[str, bytes]): the GraphQL request/query as UTF8-encoded stringoperation_name
(Optional[str]): the operation name to executecontext
(Optional[Any]): value containing anything you could need and which will be available during all the execution processvariables
(Optional[Dict[str, Any]]): the variables provided in the GraphQL requestinitial_value
(Optional[Any]): an initial value which will be forwarded to the resolver of root type (Query/Mutation/Subscription) fields
from tartiflette import create_engine
engine = await create_engine(
"myDsl.graphql"
)
async for result in engine.subscribe(
query="subscription MyLiveVideo($id: String!) { videoLive(id: $id) { id viewsNumber } }",
operation_name="MyLiveVideo",
context={
"mysql_client": MySQLClient(),
"auth_info": AuthInfo(),
},
variables={
"id": "1234",
},
initial_value={},
):
pass
# each yield `result` will contains something like
# {
# "data": {
# "videoLive": {
# "id": "1234",
# "viewsNumber": 87564
# }
# }
# }
@Subscription
: How to subscribe to a field?
In the Tartiflette engine, to subscribe to a field, you simply use the decorator @Subscription
over a callable which returns an async generator
. That's all there is to it. For advanced use-cases, take a look at putting a @Resolver
on top of a Subscription
(see below).
import asyncio
from tartiflette import Subscription
from recipes_manager.data import RECIPES
@Subscription("Subscription.launchAndWaitCookingTimer")
async def subscribe_subscription_launch_and_wait_cooking_timer(
parent, args, ctx, info
):
recipe = None
for recipe_item in RECIPES:
if recipe_item["id"] == args["id"]:
recipe = recipe_item
if not recipe:
raise Exception(f"The recipe < {args['id']} > does not exist.")
for i in range(recipe["cookingTime"]):
yield {
"remainingTime": recipe["cookingTime"] - i,
"status": "COOKING",
}
await asyncio.sleep(1)
yield {
"launchAndWaitCookingTimer": {
"remainingTime": 0,
"status": "COOKED",
},
}
Decorator signature
name
(str): fully qualified field name to resolveschema_name
(str = "default"): name of the schema to which link the subscriptionarguments_coercer
(Optional[Callable] = None): callable to use to coerce field argumentsconcurrently
(Optional[bool] = None): determine whether or not the output list of the decorated field should be coerced concurrently
The arguments_coercer
parameter is here to provide an easy way to override the default callable used internaly by Tartiflette to coerce the arguments of the field. It has the same behaviour as the custom_default_arguments_coercer
parameter at engine initialisation but impact only the field.
@Resolver
: Manipulating and shaping the result of a @Subscription
function
In some cases, especially when you use tools like Redis, Google Pub/Sub etc... the value which will be yield
ed won't be structured as expected by the schema. In addition to the @Subscription
decorator, you can implement a @Resolver
wrapper to shape the data accordingly to the return type.
import asyncio
from tartiflette import Resolver, Subscription
from recipes_manager.data import RECIPES
@Resolver("Subscription.launchAndWaitCookingTimer")
async def resolve_subscription_launch_and_wait_cooking_timer(
parent, args, ctx, info
):
if parent > 0:
return {
"remainingTime": parent,
"status": "COOKING",
}
return {
"remainingTime": 0,
"status": "COOKED",
}
@Subscription("Subscription.launchAndWaitCookingTimer")
async def subscribe_subscription_launch_and_wait_cooking_timer(
parent, args, ctx, info
):
recipe = None
for recipe_item in RECIPES:
if recipe_item["id"] == args["id"]:
recipe = recipe_item
if not recipe:
raise Exception(f"The recipe < {args['id']} > does not exist.")
for i in range(recipe["cookingTime"]):
yield recipe["cookingTime"] - i
await asyncio.sleep(1)