Skip to main content

Messages - Python SDK feature guide

This page shows how to do the following:

We'll use the following example, explained in detail in the sections below:

import asyncio
from dataclasses import dataclass
from enum import IntEnum
from typing import Optional

from temporalio import workflow


class Language(IntEnum):
Chinese = 1
English = 2
French = 3
Spanish = 4
Portuguese = 5


GREETINGS = {
Language.English: "Hello, world!",
Language.Chinese: "你好,世界!",
}


@dataclass
class GetLanguagesInput:
supported_only: bool


@dataclass
class ApproveInput:
name: str


@workflow.defn
class GreetingWorkflow:
def __init__(self) -> None:
self.approved_for_release = False
self.approver_name: Optional[str] = None
self.language = Language.English

@workflow.run
async def run(self) -> str:
await workflow.wait_condition(lambda: self.approved_for_release)
return GREETINGS[self.language]

@workflow.query
def get_language(self) -> Language:
return self.language

@workflow.query
def get_languages(self, input: GetLanguagesInput) -> list[Language]:
if input.supported_only:
return [lang for lang in Language if lang in GREETINGS]
else:
return list(Language)

@workflow.signal
def approve(self, input: ApproveInput) -> None:
self.approved_for_release = True
self.approver_name = input.name

@workflow.update
def set_language(self, language: Language) -> Language:
previous_language, self.language = self.language, language
return previous_language

@set_language.validator
def validate_language(self, language: Language) -> None:
if language not in GREETINGS:
raise ValueError(f"{language.name} is not supported")

Queries

A Query is a synchronous operation that's used to get the state of a Workflow Execution.

Writing a Query handler (#define-query)

Here's the Query handler from the example:

    @workflow.query
def get_language(self) -> Language:
return self.language

Notice that:

  • You use the @workflow.query decorator to define a Query handler method`.
  • A Query handler is a plain def, not an async def: you can't do async operations such as executing an Activity in a Query handler.
  • The return value must be serializable: here we're using an IntEnum to return the language (plain Enum is not serializable).
  • The @workflow.query decorator takes arguments; see the reference docs.
  • A Query handler can take arguments. The arguments must be serializable, and it's recommended to use a single dataclass argument to which fields can be added as needed. Here's an example:
    @dataclass
    class GetLanguagesInput:
    supported_only: bool

    ...
    @workflow.defn
    class GreetingWorkflow:
    ....
    @workflow.query
    def get_languages(self, input: GetLanguagesInput) -> list[Language]:
    if input.supported_only:
    return [lang for lang in Language if lang in GREETINGS]
    else:
    return list(Language)

Sending a Query

To send a Query to a Workflow Execution from Client code, use the query() method on the Workflow handle:

current_language = await wf_handle.query(GreetingWorkflow.get_language)
supported_languages = await wf_handle.query(
GreetingWorkflow.get_languages, GetLanguagesInput(supported_only=True)
)

You can send a Query to a Workflow Execution that has closed.

Signals

Writing a Signal handler

A Signal is a message sent asynchronously to a running Workflow Execution which can be used to change the state and control the flow of a Workflow Execution. It can only deliver data to a Workflow Execution that has not already closed. A Signal can be sent to a Workflow Execution from a Temporal Client or from another Workflow Execution. Here's the Signal handler from the example:

    @workflow.signal
def approve(self, input: ApproveInput) -> None:
self.approved_for_release = True
self.approver_name = input.name

Notice that:

  • You use the @workflow.signal decorator to define a Signal handler method.
  • The handler arguments must be serializable, and it's recommended to use a single dataclass argument to which fields can be added as needed.
  • The handler should not return a value: any returned value will be ignored.

Non-dynamic methods can only have positional arguments.

Updates

An Update is a trackable request sent synchronously to a running Workflow Execution that can change the state and control the flow of a Workflow Execution, and return a result. The sender of the request must wait until the update is at least accepted or rejected by a Worker, and will often opt to wait further to receive the value returned by the Update handler, or an exception indicating what went wrong. Update handlers can do arbitrarily long-running async operations (like signal handlers, and the main workflow method).

Writing Update handlers and validators as a Workflow author

Here's a Workflow Definition that illustrates how to create an Update handler, and an associated validator:

@dataclass
class HelloWorldInput:
greeting: str


@workflow.defn
class HelloWorldWorkflow:
def __init__(self):
self.greeting: Optional[str] = None

@workflow.run
async def run(self) -> str:
await workflow.wait_condition(lambda: self.greeting is not None)
return f"{self.greeting}, world!"

@workflow.update
def set_greeting(self, input: HelloWorldInput) -> Optional[str]:
previous_greeting, self.greeting = self.greeting, input.greeting
return previous_greeting

@set_greeting.validator
def set_greeting_validator(self, input: HelloWorldInput) -> None:
if input.greeting.lower() not in {"hello", "hola"}:
raise Exception(f"invalid greeting: {input.greeting}")

Note the following:

  • Update handlers and validators are defined using decorators, in a similar way to Signal and Query handlers.
  • The handler method signature defines the argument type and return type that a client will use when sending an Update.
  • It is possible to use multiple arguments, but this is not recommended: instead use a single dataclass argument in which fields can be added/removed as needed.
  • Validators are optional; if you don't want to be able to reject updates then you don't need a validator.
  • To reject an Update, you raise an exception (of any type) in the validator.
  • The name of the decorator you use to define the validator is based on the name that you give to the handler.
  • The validator must take the same argument type as the handler, but always returns None.
  • The update decorator accepts arguments.
  • This handler method does not do any long-running async operations; if it did, it would need to be an async def.
  • Examples of async operations that can be done in an update handler include asyncio.sleep(...), workflow.wait_condition(...), and execution of Activities and Child Workflows.

Sending an Update to a Workflow Execution

Recall that when sending an Update, the client will not receive a response until a Worker is available and the Update has been delivered to the Worker. If you want the server to send a response as soon as it receives your request, then you must use a Signal instead.

To send an Update to a Workflow Execution, you have two choices:

1. Use execute_update to wait for the update to complete

execute_update sends an Update and waits until it has been completed by a Worker. It returns the Update result:

# Wait until the update is completed
update_result = await workflow_handle.execute_update(
HelloWorldWorkflow.set_greeting,
HelloWorldInput("World"),
)

2. Use start_update to receive a handle as soon as the update is accepted or rejected

start_update sends an Update and waits until the Update has been accepted or rejected by a Worker. It returns an UpdateHandle:

# Wait until the update is accepted
update_handle = await workflow_handle.start_update(
HelloWorldWorkflow.set_greeting,
HelloWorldInput("World"),
)
# Wait until the update is completed
update_result = await update_handle.result()

Exceptions

The following exceptions might be raised by execute_update, or when calling update_handle.result() on a handle obtained from start_update:

If the Workflow handle references a Workflow that doesn't exist then execute_update and start_update will both raise temporalio.service.RPCError "sql: no rows in result set".

For more sophisticated uses of Signal and Update, and safe usage patterns, see https://github.com/temporalio/samples-python/tree/main/updates_and_signals/safe_message_handlers.

Dynamic Handler

What is a Dynamic Handler?

Temporal supports Dynamic Workflows, Activities, Signals, and Queries. These are unnamed handlers that are invoked if no other statically defined handler with the given name exists.

Dynamic Handlers provide flexibility to handle cases where the names of Workflows, Activities, Signals, or Queries aren't known at run time.

caution

Dynamic Handlers should be used judiciously as a fallback mechanism rather than the primary approach. Overusing them can lead to maintainability and debugging issues down the line.

Instead, Workflows, Activities, Signals, and Queries should be defined statically whenever possible, with clear names that indicate their purpose. Use static definitions as the primary way of structuring your Workflows.

Reserve Dynamic Handlers for cases where the handler names are not known at compile time and need to be looked up dynamically at runtime. They are meant to handle edge cases and act as a catch-all, not as the main way of invoking logic.

Set a Dynamic Signal

How to set a Dynamic Signal

A Dynamic Signal in Temporal is a Signal that is invoked dynamically at runtime if no other Signal with the same input is registered. A Signal can be made dynamic by adding dynamic=True to the @signal.defn decorator.

The Signal Handler should accept self, a string input, and a Sequence[temporalio.common.RawValue]. The payload_converter() function is used to convert a RawValue object to the desired type.

# ...
@workflow.signal(dynamic=True)
async def dynamic_signal(self, name: str, args: Sequence[RawValue]) -> None:
await self._pending_greetings.put(name)

Set a Dynamic Query

How to set a Dynamic Query

A Dynamic Query in Temporal is a Query that is invoked dynamically at runtime if no other Query with the same name is registered. A Query can be made dynamic by adding dynamic=True to the @query.defn decorator.

The Query Handler should accept self, a string name, and a Sequence[temporalio.common.RawValue]. The payload_converter() function is used to convert a RawValue object to the desired type.

# ...
@workflow.query(dynamic=True)
def dynamic_query(self, input: str, args: Sequence[RawValue]) -> str:
return self._greeting

Set a Dynamic Workflow

How to set a Dynamic Workflow

A Dynamic Workflow in Temporal is a Workflow that is invoked dynamically at runtime if no other Workflow with the same name is registered. A Workflow can be made dynamic by adding dynamic=True to the @workflow.defn decorator. You must register the Workflow with the Worker before it can be invoked.

The Workflow Definition must then accept a single argument of type Sequence[temporalio.common.RawValue]. The payload_converter() function is used to convert a RawValue object to the desired type.

# ...
@workflow.defn(dynamic=True)
class DynamicWorkflow:
@workflow.run
async def run(self, args: Sequence[RawValue]) -> str:
name = workflow.payload_converter().from_payload(args[0].payload, str)
return await workflow.execute_activity(
default_greeting,
YourDataClass("Hello", name),
start_to_close_timeout=timedelta(seconds=10),
)

Set a Dynamic Activity

How to set a Dynamic Activity

A Dynamic Activity in Temporal is an Activity that is invoked dynamically at runtime if no other Activity with the same name is registered. An Activity can be made dynamic by adding dynamic=True to the @activity.defn decorator. You must register the Activity with the Worker before it can be invoked.

The Activity Definition must then accept a single argument of type Sequence[temporalio.common.RawValue]. The payload_converter() function is used to convert a RawValue object to the desired type.

# ...
@activity.defn(dynamic=True)
async def dynamic_greeting(args: Sequence[RawValue]) -> str:
arg1 = activity.payload_converter().from_payload(args[0].payload, YourDataClass)
return (
f"{arg1.greeting}, {arg1.name}!\nActivity Type: {activity.info().activity_type}"
)
# ...
@workflow.defn
class GreetingWorkflow:
@workflow.run
async def run(self, name: str) -> str:
return await workflow.execute_activity(
"unregistered_activity",
YourDataClass("Hello", name),
start_to_close_timeout=timedelta(seconds=10),
)