Subscription
Subscription is the third operation type available in GraphQL. It brings the event-based subscriptions mindset to the engine.
It's up to you to implement the event technology you want, like Google Pub/Sub, Nats, Redis... in our example, we decided to focus purely on the engine part of the feature.
Engine: How to execute a subscription?
The engine is responsible for executing both the Query/Mutations and the Subscription's. The first ones are executed by the execute method, where Subscription, is executed by the subscribe method.
The parameters that are available on the subscribe method are:
query(Union[str, bytes]): the GraphQL request/query as UTF8-encoded stringoperation_name(Optional[str]): the operation name to executecontext(Optional[Any]): value containing anything you could need and which will be available during all the execution processvariables(Optional[Dict[str, Any]]): the variables provided in the GraphQL requestinitial_value(Optional[Any]): an initial value which will be forwarded to the resolver of root type (Query/Mutation/Subscription) fields
from tartiflette import create_engine
engine = await create_engine(
"myDsl.graphql"
)
async for result in engine.subscribe(
query="subscription MyLiveVideo($id: String!) { videoLive(id: $id) { id viewsNumber } }",
operation_name="MyLiveVideo",
context={
"mysql_client": MySQLClient(),
"auth_info": AuthInfo(),
},
variables={
"id": "1234",
},
initial_value={},
):
pass
# each yield `result` will contains something like
# {
# "data": {
# "videoLive": {
# "id": "1234",
# "viewsNumber": 87564
# }
# }
# }
@Subscription: How to subscribe to a field?
In the Tartiflette engine, to subscribe to a field, you simply use the decorator @Subscription over a callable which returns an async generator. That's all there is to it. For advanced use-cases, take a look at putting a @Resolver on top of a Subscription (see below).
import asyncio
from tartiflette import Subscription
from recipes_manager.data import RECIPES
@Subscription("Subscription.launchAndWaitCookingTimer")
async def subscribe_subscription_launch_and_wait_cooking_timer(
parent, args, ctx, info
):
recipe = None
for recipe_item in RECIPES:
if recipe_item["id"] == args["id"]:
recipe = recipe_item
if not recipe:
raise Exception(f"The recipe < {args['id']} > does not exist.")
for i in range(recipe["cookingTime"]):
yield {
"remainingTime": recipe["cookingTime"] - i,
"status": "COOKING",
}
await asyncio.sleep(1)
yield {
"launchAndWaitCookingTimer": {
"remainingTime": 0,
"status": "COOKED",
},
}
@Resolver: Manipulating and shaping the result of a @Subscription function
In some cases, especially when you use tools like Redis, Google Pub/Sub etc... the value which will be yielded won't be structured as expected by the schema. In addition to the @Subscription decorator, you can implement a @Resolver wrapper to shape the data accordingly to the return type.
import asyncio
from tartiflette import Resolver, Subscription
from recipes_manager.data import RECIPES
@Resolver("Subscription.launchAndWaitCookingTimer")
async def resolve_subscription_launch_and_wait_cooking_timer(
parent, args, ctx, info
):
if parent > 0:
return {
"remainingTime": parent,
"status": "COOKING",
}
return {
"remainingTime": 0,
"status": "COOKED",
}
@Subscription("Subscription.launchAndWaitCookingTimer")
async def subscribe_subscription_launch_and_wait_cooking_timer(
parent, args, ctx, info
):
recipe = None
for recipe_item in RECIPES:
if recipe_item["id"] == args["id"]:
recipe = recipe_item
if not recipe:
raise Exception(f"The recipe < {args['id']} > does not exist.")
for i in range(recipe["cookingTime"]):
yield recipe["cookingTime"] - i
await asyncio.sleep(1)