Messages - Python SDK feature guide
This page shows how to do the following:
We'll use the following example, explained in detail in the sections below:
import asyncio
from dataclasses import dataclass
from enum import IntEnum
from typing import Optional
from temporalio import workflow
class Language(IntEnum):
Chinese = 1
English = 2
French = 3
Spanish = 4
Portuguese = 5
GREETINGS = {
Language.English: "Hello, world!",
Language.Chinese: "你好,世界!",
}
@dataclass
class GetLanguagesInput:
supported_only: bool
@dataclass
class ApproveInput:
name: str
@workflow.defn
class GreetingWorkflow:
def __init__(self) -> None:
self.approved_for_release = False
self.approver_name: Optional[str] = None
self.language = Language.English
@workflow.run
async def run(self) -> str:
await workflow.wait_condition(lambda: self.approved_for_release)
return GREETINGS[self.language]
@workflow.query
def get_language(self) -> Language:
return self.language
@workflow.query
def get_languages(self, input: GetLanguagesInput) -> list[Language]:
if input.supported_only:
return [lang for lang in Language if lang in GREETINGS]
else:
return list(Language)
@workflow.signal
def approve(self, input: ApproveInput) -> None:
self.approved_for_release = True
self.approver_name = input.name
@workflow.update
def set_language(self, language: Language) -> Language:
previous_language, self.language = self.language, language
return previous_language
@set_language.validator
def validate_language(self, language: Language) -> None:
if language not in GREETINGS:
raise ValueError(f"{language.name} is not supported")
Queries
A Query is a synchronous operation that's used to get the state of a Workflow Execution.
Writing a Query handler (#define-query)
Here's the Query handler from the example:
@workflow.query
def get_language(self) -> Language:
return self.language
Notice that:
- You use the
@workflow.query
decorator to define a Query handler method`. - A Query handler is a plain
def
, not anasync def
: you can't do async operations such as executing an Activity in a Query handler. - The return value must be serializable: here we're using an
IntEnum
to return the language (plainEnum
is not serializable). - The
@workflow.query
decorator takes arguments; see the reference docs. - A Query handler can take arguments. The arguments must be serializable, and it's recommended to use a single dataclass argument to which fields can be added as needed. Here's an example:
@dataclass
class GetLanguagesInput:
supported_only: bool
...
@workflow.defn
class GreetingWorkflow:
....
@workflow.query
def get_languages(self, input: GetLanguagesInput) -> list[Language]:
if input.supported_only:
return [lang for lang in Language if lang in GREETINGS]
else:
return list(Language)
Sending a Query
To send a Query to a Workflow Execution from Client code, use the query()
method on the Workflow handle:
current_language = await wf_handle.query(GreetingWorkflow.get_language)
supported_languages = await wf_handle.query(
GreetingWorkflow.get_languages, GetLanguagesInput(supported_only=True)
)
You can send a Query to a Workflow Execution that has closed.
Signals
Writing a Signal handler
A Signal is a message sent asynchronously to a running Workflow Execution which can be used to change the state and control the flow of a Workflow Execution. It can only deliver data to a Workflow Execution that has not already closed. A Signal can be sent to a Workflow Execution from a Temporal Client or from another Workflow Execution. Here's the Signal handler from the example:
@workflow.signal
def approve(self, input: ApproveInput) -> None:
self.approved_for_release = True
self.approver_name = input.name
Notice that:
- You use the
@workflow.signal
decorator to define a Signal handler method. - The handler arguments must be serializable, and it's recommended to use a single dataclass argument to which fields can be added as needed.
- The handler should not return a value: any returned value will be ignored.
Non-dynamic methods can only have positional arguments.
Updates
An Update is a trackable request sent synchronously to a running Workflow Execution that can change the state and control the flow of a Workflow Execution, and return a result. The sender of the request must wait until the update is at least accepted or rejected by a Worker, and will often opt to wait further to receive the value returned by the Update handler, or an exception indicating what went wrong. Update handlers can do arbitrarily long-running async operations (like signal handlers, and the main workflow method).
Writing Update handlers and validators as a Workflow author
Here's a Workflow Definition that illustrates how to create an Update handler, and an associated validator:
@dataclass
class HelloWorldInput:
greeting: str
@workflow.defn
class HelloWorldWorkflow:
def __init__(self):
self.greeting: Optional[str] = None
@workflow.run
async def run(self) -> str:
await workflow.wait_condition(lambda: self.greeting is not None)
return f"{self.greeting}, world!"
@workflow.update
def set_greeting(self, input: HelloWorldInput) -> Optional[str]:
previous_greeting, self.greeting = self.greeting, input.greeting
return previous_greeting
@set_greeting.validator
def set_greeting_validator(self, input: HelloWorldInput) -> None:
if input.greeting.lower() not in {"hello", "hola"}:
raise Exception(f"invalid greeting: {input.greeting}")
Note the following:
- Update handlers and validators are defined using decorators, in a similar way to Signal and Query handlers.
- The handler method signature defines the argument type and return type that a client will use when sending an Update.
- It is possible to use multiple arguments, but this is not recommended: instead use a single dataclass argument in which fields can be added/removed as needed.
- Validators are optional; if you don't want to be able to reject updates then you don't need a validator.
- To reject an Update, you raise an exception (of any type) in the validator.
- The name of the decorator you use to define the validator is based on the name that you give to the handler.
- The validator must take the same argument type as the handler, but always returns
None
. - The
update
decorator accepts arguments. - This handler method does not do any long-running async operations; if it did, it would need to be an
async def
. - Examples of async operations that can be done in an update handler include
asyncio.sleep(...)
,workflow.wait_condition(...)
, and execution of Activities and Child Workflows.
Sending an Update to a Workflow Execution
Recall that when sending an Update, the client will not receive a response until a Worker is available and the Update has been delivered to the Worker. If you want the server to send a response as soon as it receives your request, then you must use a Signal instead.
To send an Update to a Workflow Execution, you have two choices:
1. Use execute_update
to wait for the update to complete
execute_update
sends an Update and waits until it has been completed by a Worker. It returns the Update result:
# Wait until the update is completed
update_result = await workflow_handle.execute_update(
HelloWorldWorkflow.set_greeting,
HelloWorldInput("World"),
)
2. Use start_update
to receive a handle as soon as the update is accepted or rejected
start_update
sends an Update and waits until the Update has been accepted or rejected by a Worker. It returns an UpdateHandle
:
# Wait until the update is accepted
update_handle = await workflow_handle.start_update(
HelloWorldWorkflow.set_greeting,
HelloWorldInput("World"),
)
# Wait until the update is completed
update_result = await update_handle.result()
Exceptions
The following exceptions might be raised by execute_update
, or when calling update_handle.result()
on a handle obtained from start_update
:
-
temporalio.client.WorkflowUpdateFailedError
The Update was either rejected by the validator, or the Workflow author deliberately failed the Update by raising
ApplicationError
in the handler. -
temporalio.service.RPCError
"workflow execution already completed"`This will happen if the Workflow finished while the Update handler execution was in progress, for example because
- The Workflow was canceled or was deliberately failed (the Workflow author raised
ApplicationError
outside an Update handler) - The Workflow completed normally or continued-as-new and the Workflow author did not wait for handlers to be finished.
- The Workflow was canceled or was deliberately failed (the Workflow author raised
If the Workflow handle references a Workflow that doesn't exist then execute_update
and start_update
will both raise temporalio.service.RPCError "sql: no rows in result set"
.
For more sophisticated uses of Signal and Update, and safe usage patterns, see https://github.com/temporalio/samples-python/tree/main/updates_and_signals/safe_message_handlers.
Dynamic Handler
What is a Dynamic Handler?
Temporal supports Dynamic Workflows, Activities, Signals, and Queries. These are unnamed handlers that are invoked if no other statically defined handler with the given name exists.
Dynamic Handlers provide flexibility to handle cases where the names of Workflows, Activities, Signals, or Queries aren't known at run time.
Dynamic Handlers should be used judiciously as a fallback mechanism rather than the primary approach. Overusing them can lead to maintainability and debugging issues down the line.
Instead, Workflows, Activities, Signals, and Queries should be defined statically whenever possible, with clear names that indicate their purpose. Use static definitions as the primary way of structuring your Workflows.
Reserve Dynamic Handlers for cases where the handler names are not known at compile time and need to be looked up dynamically at runtime. They are meant to handle edge cases and act as a catch-all, not as the main way of invoking logic.
Set a Dynamic Signal
How to set a Dynamic Signal
A Dynamic Signal in Temporal is a Signal that is invoked dynamically at runtime if no other Signal with the same input is registered.
A Signal can be made dynamic by adding dynamic=True
to the @signal.defn
decorator.
The Signal Handler should accept self
, a string input, and a Sequence[temporalio.common.RawValue]
.
The payload_converter() function is used to convert a RawValue
object to the desired type.
# ...
@workflow.signal(dynamic=True)
async def dynamic_signal(self, name: str, args: Sequence[RawValue]) -> None:
await self._pending_greetings.put(name)
Set a Dynamic Query
How to set a Dynamic Query
A Dynamic Query in Temporal is a Query that is invoked dynamically at runtime if no other Query with the same name is registered.
A Query can be made dynamic by adding dynamic=True
to the @query.defn
decorator.
The Query Handler should accept self
, a string name, and a Sequence[temporalio.common.RawValue]
.
The payload_converter() function is used to convert a RawValue
object to the desired type.
# ...
@workflow.query(dynamic=True)
def dynamic_query(self, input: str, args: Sequence[RawValue]) -> str:
return self._greeting
Set a Dynamic Workflow
How to set a Dynamic Workflow
A Dynamic Workflow in Temporal is a Workflow that is invoked dynamically at runtime if no other Workflow with the same name is registered.
A Workflow can be made dynamic by adding dynamic=True
to the @workflow.defn
decorator.
You must register the Workflow with the Worker before it can be invoked.
The Workflow Definition must then accept a single argument of type Sequence[temporalio.common.RawValue]
.
The payload_converter() function is used to convert a RawValue
object to the desired type.
# ...
@workflow.defn(dynamic=True)
class DynamicWorkflow:
@workflow.run
async def run(self, args: Sequence[RawValue]) -> str:
name = workflow.payload_converter().from_payload(args[0].payload, str)
return await workflow.execute_activity(
default_greeting,
YourDataClass("Hello", name),
start_to_close_timeout=timedelta(seconds=10),
)
Set a Dynamic Activity
How to set a Dynamic Activity
A Dynamic Activity in Temporal is an Activity that is invoked dynamically at runtime if no other Activity with the same name is registered.
An Activity can be made dynamic by adding dynamic=True
to the @activity.defn
decorator.
You must register the Activity with the Worker before it can be invoked.
The Activity Definition must then accept a single argument of type Sequence[temporalio.common.RawValue]
.
The payload_converter() function is used to convert a RawValue
object to the desired type.
# ...
@activity.defn(dynamic=True)
async def dynamic_greeting(args: Sequence[RawValue]) -> str:
arg1 = activity.payload_converter().from_payload(args[0].payload, YourDataClass)
return (
f"{arg1.greeting}, {arg1.name}!\nActivity Type: {activity.info().activity_type}"
)
# ...
@workflow.defn
class GreetingWorkflow:
@workflow.run
async def run(self, name: str) -> str:
return await workflow.execute_activity(
"unregistered_activity",
YourDataClass("Hello", name),
start_to_close_timeout=timedelta(seconds=10),
)