Developer's guide - Features
The Features section of the Temporal Developer's guide provides basic implementation guidance on how to use many of the development features available to Workflows and Activities in the Temporal Platform.
This guide is a work in progress. Some sections may be incomplete or missing for some languages. Information may change at any time.
If you can't find what you are looking for in the Developer's guide, it could be in older docs for SDKs.
In this section you can find the following:
- How to develop Signals
- How to develop Queries
- How to start a Child Workflow Execution
- How to start a Temporal Cron Job
- How to use Continue-As-New
- How to set Workflow timeouts & retries
- How to set Activity timeouts & retries
- How to Heartbeat an Activity
- How to Asynchronously complete an Activity
- How to register Namespaces
Signals
A SignalWhat is a Signal?
A Signal is an asynchronous request to a Workflow Execution.
Learn more is a message sent to a running Workflow Execution.
Signals are defined in your code and handled in your Workflow Definition. Signals can be sent to Workflow Executions from a Temporal Client or from another Workflow Execution.
Define Signal
A Signal has a name and can have arguments.
- The name, also called a Signal type, is a string.
- The arguments must be serializableWhat is a Data Converter?
A Data Converter is a Temporal SDK component that encodes and decodes data entering and exiting a Temporal Server.
Learn more.
- Go
- Java
- PHP
- Python
- TypeScript
Structs should be used to define Signals and carry data, as long as the struct is serializable via the Data Converter.
The Receive()
method on the Data Converter decodes the data into the Struct within the Workflow.
Only public fields are serializable.
MySignal struct {
Message string // serializable
message string // not serializable
}
The @SignalMethod
annotation indicates that the method is used to handle and react to external Signals.
@SignalMethod
void mySignal(String signalName);
The method can have parameters that contain the Signal payload and must be serializable by the default Jackson JSON Payload Converter.
void mySignal(String signalName, Object... args);
This method does not return a value and must have a void
return type.
Things to consider when defining Signals:
- Use Workflow object constructors and initialization blocks to initialize the internal data structures if possible.
- Signals might be received by a Workflow before the Workflow method is executed. When implementing Signals in scenarios where this can occur, assume that no parts of Workflow code ran. In some cases, Signal method implementation might require some initialization to be performed by the Workflow method code first—for example, when the Signal processing depends on, and is defined by the Workflow input. In this case, you can use a flag to determine whether the Workflow method is already triggered; if not, persist the Signal data into a collection for delayed processing by the Workflow method.
Workflows can answer synchronous QueriesHow to develop with Queries
A Query is a synchronous operation that is used to get the state of a Workflow Execution.
Learn more and receive SignalsHow to develop with Signals
A Signal is a message sent to a running Workflow Execution
Learn more.
All interface methods must have one of the following annotations:
- #[WorkflowMethod] indicates an entry point to a Workflow.
It contains parameters that specify timeouts and a Task Queue name.
Required parameters (such as
executionStartToCloseTimeoutSeconds
) that are not specified through the annotation must be provided at runtime. - #[SignalMethod] indicates a method that reacts to external signals. It must have a
void
return type. - #[QueryMethod] indicates a method that reacts to synchronous query requests. It must have a non
void
return type.
It is possible (though not recommended for usability reasons) to annotate concrete class implementation.
You can have more than one method with the same annotation (except #[WorkflowMethod]).
For example:
use Temporal\Workflow\WorkflowInterface;
use Temporal\Workflow\WorkflowMethod;
use Temporal\Workflow\SignalMethod;
use Temporal\Workflow\QueryMethod;
#[WorkflowInterface]
interface FileProcessingWorkflow
{
#[WorkflowMethod]
public function processFile(Argument $args);
#[QueryMethod("history")]
public function getHistory(): array;
#[QueryMethod("status")]
public function getStatus(): string;
#[SignalMethod]
public function retryNow(): void;
#[SignalMethod]
public function abandon(): void;
}
Note that name parameter of Workflow method annotations can be used to specify name of Workflow, Signal and Query types. If name is not specified the short name of the Workflow interface is used.
In the above code the #[WorkflowMethod(name)]
is not specified, thus the Workflow Type defaults to "FileProcessingWorkflow"
.
To define a Signal, set the Signal decorator @workflow.signal
on the Signal function inside your Workflow.
@workflow.signal
def your_signal(self, value: str) -> None:
self._signal = value
The @workflow.signal
decorator defines a method as a Signal. Signals can be asynchronous or synchronous methods and can be inherited; however, if a method is overridden, the override must also be decorated.
Dynamic Signals
You can use @workflow.signal(dynamic=True)
, which means all other unhandled Signals fall through to this.
Your method parameters must be self
, a string Signal name, and a *args
variable argument parameter.
@workflow.signal(dynamic=True)
def signal_dynamic(self, name: str, *args: Any) -> None:
self._last_event = f"signal_dynamic {name}: {args[0]}"
Customize name
Non-dynamic methods can only have positional arguments. Temporal suggests taking a single argument that is an object or data class of fields that can be added to as needed.
Return values from Signal methods are ignored.
You can have a name parameter to customize the Signal's name, otherwise it defaults to the unqualified method __name__
.
The following example sets a custom Signal name.
@workflow.signal(name="Custom-Name")
def signal(self, arg: str) -> None:
self._last_event = f"signal: {arg}"
You can either set the name
or the dynamic
parameter in a Signal's decorator, but not both.
- TypeScript
- JavaScript
import { defineSignal } from '@temporalio/workflow';
interface JoinInput {
userId: string;
groupId: string;
}
export const joinSignal = defineSignal<[JoinInput]>('join');
import { defineSignal } from '@temporalio/workflow';
export const joinSignal = defineSignal('join');
Handle Signal
Workflows listen for Signals by the Signal's name.
- Go
- Java
- PHP
- Python
- TypeScript
Use the GetSignalChannel()
API from the go.temporal.io/sdk/workflow
package to get the Signal Channel.
func YourWorkflowDefinition(ctx workflow.Context, param YourWorkflowParam) error {
// ...
var signal MySignal
signalChan := workflow.GetSignalChannel(ctx, "your-signal-name")
signalChan.Receive(ctx, &signal)
if len(signal.Message) > 0 && signal.Message != "SOME_VALUE" {
return errors.New("signal")
}
// ...
}
In the example above, the Workflow code uses workflow.GetSignalChannel
to open a workflow.Channel
for the Signal type (identified by the Signal name).
Before completing the Workflow or using Continue-As-New, make sure to do an asynchronous drain on the Signal channel. Otherwise, the Signals will be lost.
Use the @SignalMethod
annotation to handle Signals in the Workflow interface.
The Signal type defaults to the name of the method. In the following example, the Signal type defaults to retryNow
.
@WorkflowInterface
public interface FileProcessingWorkflow {
@WorkflowMethod
String processFile(Arguments args);
@SignalMethod
void retryNow();
}
To overwrite this default naming and assign a custom Signal type, use the @SignalMethod
annotation with the name
parameter.
In the following example, the Signal type is set to retrysignal
.
@WorkflowInterface
public interface FileProcessingWorkflow {
@WorkflowMethod
String processFile(Arguments args);
@SignalMethod(name = "retrysignal")
void retryNow();
}
A Workflow interface can define any number of methods annotated with @SignalMethod
, but the method names or the name
parameters for each must be unique.
In the following example, we define a Signal method updateGreeting
to update the greeting in the Workflow.
We set a Workflow.await
in the Workflow implementation to block the current Workflow Execution until the provided unblock condition is evaluated to true
.
In this case, the unblocking condition is evaluated to true
when the Signal to update the greeting is received.
@WorkflowInterface
public interface HelloWorld {
@WorkflowMethod
void sayHello(String name);
@SignalMethod
void updateGreeting(String greeting);
}
public class HelloWorldImpl implements HelloWorld {
private final Logger workflowLogger = Workflow.getLogger(HelloWorldImpl.class);
private String greeting;
@Override
public void sayHello(String name) {
int count = 0;
while (!"Bye".equals(greeting)) {
String oldGreeting = greeting;
Workflow.await(() -> !Objects.equals(greeting, oldGreeting));
}
workflowLogger.info(++count + ": " + greeting + " " + name + "!");
}
@Override
public void updateGreeting(String greeting) {
this.greeting = greeting;
}
}
This Workflow completes when the Signal updates the greeting to Bye
.
Dynamic Signal Handler You can also implement Signal handlers dynamically. This is useful for library-level code and implementation of DSLs.
Use Workflow.registerListener(Object)
to register an implementation of the DynamicSignalListener
in the Workflow implementation code.
Workflow.registerListener(
(DynamicSignalHandler)
(signalName, encodedArgs) -> name = encodedArgs.get(0, String.class));
When registered, any Signals sent to the Workflow without a defined handler will be delivered to the DynamicSignalHandler
.
Note that you can only register one Workflow.registerListener(Object)
per Workflow Execution.
DynamicSignalHandler
can be implemented in both regular and dynamic Workflow implementations.
Use the #[SignalMethod]
annotation to handle Signals in the Workflow interface:
use Temporal\Workflow;
#[Workflow\WorkflowInterface]
class YourWorkflow
{
private bool $value;
#[Workflow\WorkflowMethod]
public function run()
{
yield Workflow::await(fn()=> $this->value);
return 'OK';
}
#[Workflow\SignalMethod]
public function setValue(bool $value)
{
$this->value = $value;
}
}
In the preceding example, the Workflow updates the protected value.
The main Workflow coroutine waits for the value to change by using the Workflow::await()
function.
To send a Signal to the Workflow, use the signal
method from the WorkflowHandle
class.
await handle.signal("some signal")
- TypeScript
- JavaScript
import { setHandler } from '@temporalio/workflow';
export async function yourWorkflow() {
const groups = new Map<string, Set<string>>();
setHandler(joinSignal, ({ userId, groupId }: JoinInput) => {
const group = groups.get(groupId);
if (group) {
group.add(userId);
} else {
groups.set(groupId, new Set([userId]));
}
});
}
import { setHandler } from '@temporalio/workflow';
export async function yourWorkflow() {
const groups = new Map();
setHandler(joinSignal, ({ userId, groupId }) => {
const group = groups.get(groupId);
if (group) {
group.add(userId);
}
else {
groups.set(groupId, new Set([userId]));
}
});
}
Send Signal from Client
When a Signal is sent successfully from the Temporal Client, the WorkflowExecutionSignaled Event appears in the Event History of the Workflow that receives the Signal.
- Go
- Java
- PHP
- Python
- TypeScript
Use the SignalWorkflow()
method on an instance of the Go SDK Temporal Client to send a SignalWhat is a Signal?
A Signal is an asynchronous request to a Workflow Execution.
Learn more to a Workflow Execution.
Pass in both the Workflow IdWhat is a Workflow Id?
A Workflow Id is a customizable, application-level identifier for a Workflow Execution that is unique to an Open Workflow Execution within a Namespace.
Learn more and Run IdWhat is a Run Id?
A Run Id is a globally unique, platform-level identifier for a Workflow Execution.
Learn more to uniquely identify the Workflow Execution.
If only the Workflow Id is supplied (provide an empty string as the Run Id param), the Workflow Execution that is Running receives the Signal.
// ...
signal := MySignal {
Message: "Some important data",
}
err = temporalClient.SignalWorkflow(context.Background(), "your-workflow-id", runID, "your-signal-name", signal)
if err != nil {
log.Fatalln("Error sending the Signal", err)
return
}
// ...
Possible errors:
serviceerror.NotFound
serviceerror.Internal
serviceerror.Unavailable
To send a Signal to a Workflow Execution from a Client, call the Signal method, annotated with @SignalMethod
in the Workflow interface, from the Client code.
In the following Client code example, we start the Workflow greetCustomer
and call the Signal method addCustomer
that is handled in the Workflow.
// create a typed Workflow stub for GreetingsWorkflow
GreetingsWorkflow workflow = client.newWorkflowStub(GreetingsWorkflow.class,
WorkflowOptions.newBuilder()
// set the Task Queue
.setTaskQueue(taskQueue)
// Workflow Id is recommended but not required
.setWorkflowId(workflowId)
.build());
// start the Workflow
WorkflowClient.start(workflow::greetCustomer);
// send a Signal to the Workflow
Customer customer = new Customer("John", "Spanish", "john@john.com");
workflow.addCustomer(customer); //addCustomer is the Signal method defined in the greetCustomer Workflow.
See Handle SignalsHow to handle Signals in an Workflow in Java
Use the @SignalMethod
annotation to handle Signals within the Workflow interface.
Learn more for details on how to handle Signals in a Workflow.
To send a Signal to a Workflow Execution from a Client, call the Signal method, annotated with #[SignalMethod]
in the Workflow interface, from the Client code.
To send a Signal to a Workflow, use WorkflowClient->newWorkflowStub
or WorkflowClient->newUntypedWorkflowStub
:
$workflow = $workflowClient->newWorkflowStub(YourWorkflow::class);
$run = $workflowClient->start($workflow);
// do something
$workflow->setValue(true);
assert($run->getValue() === true);
Use WorkflowClient->newRunningWorkflowStub
or WorkflowClient->newUntypedRunningWorkflowStub
with Workflow Id to send Signals to already running Workflows.
$workflow = $workflowClient->newRunningWorkflowStub(YourWorkflow::class, 'workflowID');
$workflow->setValue(true);
See Handle SignalHow to handle a Signal
Workflows listen for Signals by the Signal's name.
Learn more for details on how to handle Signals in a Workflow.
To send a Signal from the Client, use the signal() function on the Workflow handle.
To get the Workflow handle, you can use any of the following options.
- Use the get_workflow_handle() method.
- Use the get_workflow_handle_for() method to get a type-safe Workflow handle by its Workflow Id.
- Use the start_workflow() to start a Workflow and return its handle.
async def your_function():
client = await Client.connect("localhost:7233")
handle = client.get_workflow_handle_for(
"your-workflow-id",
)
await handle.signal()
import { WorkflowClient } from '@temporalio/client';
import { joinSignal } from './workflows';
const client = new WorkflowClient();
const handle = client.getHandle('workflow-id-123');
await handle.signal(joinSignal, { userId: 'user-1', groupId: 'group-1' });
Send Signal from Workflow
A Workflow can send a Signal to another Workflow, in which case it's called an External Signal.
When an External Signal is sent:
- A SignalExternalWorkflowExecutionInitiated Event appears in the sender's Event History.
- A WorkflowExecutionSignaled Event appears in the recipient's Event History.
- Go
- Java
- PHP
- Python
- TypeScript
A Signal can be sent from within a Workflow to a different Workflow Execution using the SignalExternalWorkflow
API from the go.temporal.io/sdk/workflow
package.
// ...
func YourWorkflowDefinition(ctx workflow.Context, param YourWorkflowParam) error {
//...
signal := MySignal {
Message: "Some important data",
}
err := workflow.SignalExternalWorkflow(ctx, "some-workflow-id", "", "your-signal-name", signal).Get(ctx, nil)
if err != nil {
// ...
}
// ...
}
To send a Signal from within a Workflow to a different Workflow Execution, initiate an ExternalWorkflowStub
in the implementation of the current Workflow and call the Signal method defined in the other Workflow.
The following example shows how to use an untyped ExternalWorkflowStub
in the Workflow implementation to send a Signal to another Workflow.
public String sendGreeting(String name) {
// initiate ExternalWorkflowStub to call another Workflow by its Id "ReplyWF"
ExternalWorkflowStub callRespondWorkflow = Workflow.newUntypedExternalWorkflowStub("ReplyWF");
String responseTrigger = activity.greeting("Hello", name);
// send a Signal from this sendGreeting Workflow to the other Workflow
// by calling the Signal method name "getGreetCall" defined in that Workflow.
callRespondWorkflow.signal("getGreetCall", responseTrigger);
return responseTrigger;
To send signal to a Workflow use WorkflowClient
->newWorkflowStub
or WorkflowClient
->newUntypedWorkflowStub
:
$workflow = $workflowClient->newWorkflowStub(YourWorkflow::class);
$run = $workflowClient->start($workflow);
// do something
$workflow->setValue(true);
assert($run->getValue() === true);
Use WorkflowClient
->newRunningWorkflowStub
or WorkflowClient->newUntypedRunningWorkflowStub
with Workflow Id to send
Signals to a running Workflow.
$workflow = $workflowClient->newRunningWorkflowStub(YourWorkflow::class, 'workflowID');
$workflow->setValue(true);
Use get_external_workflow_handle_for
to get a typed Workflow handle to an existing Workflow by its identifier. Use get_external_workflow_handle
when you don't know the type of the other Workflow.
@workflow.defn
class MyWorkflow:
@workflow.run
async run(self) -> None:
handle = workflow.get_external_workflow_handle_for(OtherWorkflow.run, "other-workflow-id")
await handle.signal(OtherWorkflow.other_signal, "other signal arg")
The Workflow Type passed is only for type annotations and not for validation.
import { getExternalWorkflowHandle } from '@temporalio/workflow';
import { joinSignal } from './other-workflow';
export async function yourWorkflowThatSignals() {
const handle = getExternalWorkflowHandle('workflow-id-123');
await handle.signal(joinSignal, { userId: 'user-1', groupId: 'group-1' });
}
Signal-With-Start
Signal-With-Start is used from the Client. It takes a Workflow Id, Workflow arguments, a Signal name, and Signal arguments.
If there's a Workflow running with the given Workflow Id, it will be signaled. If there isn't, a new Workflow will be started and immediately signaled.
- Go
- Java
- PHP
- Python
- TypeScript
Use the SignalWithStartWorkflow()
API on the Go SDK Temporal Client to start a Workflow Execution (if not already running) and pass it the Signal at the same time.
Because the Workflow Execution might not exist, this API does not take a Run ID as a parameter
// ...
signal := MySignal {
Message: "Some important data",
}
err = temporalClient.SignalWithStartWorkflow(context.Background(), "your-workflow-id", "your-signal-name", signal)
if err != nil {
log.Fatalln("Error sending the Signal", err)
return
}
To send Signals to a Workflow Execution whose status is unknown, use SignalWithStart
with a WorkflowStub
in the Client code.
This method ensures that if the Workflow Execution is in a closed state, a new Workflow Execution is spawned and the Signal is delivered to the running Workflow Execution.
Note that when the SignalwithStart
spawns a new Workflow Execution, the Signal is delivered before the call to your @WorkflowMethod
.
This means that the Signal handler in your Workflow interface code will execute before the @WorkfowMethod
.
You must ensure that your code logic can deal with this.
In the following example, the Client code uses SignalwithStart
to send the Signal setCustomer
to the UntypedWorkflowStub
named GreetingWorkflow
.
If the GreetingWorkflow
Workflow Execution is not running, the SignalwithStart
starts the Workflow Execution.
...
public static void signalWithStart() {
// WorkflowStub is a client-side stub to a single Workflow instance
WorkflowStub untypedWorkflowStub = client.newUntypedWorkflowStub("GreetingWorkflow",
WorkflowOptions.newBuilder()
.setWorkflowId(workflowId)
.setTaskQueue(taskQueue)
.build());
untypedWorkflowStub.signalWithStart("setCustomer", new Object[] {customer2}, new Object[] {customer1});
printWorkflowStatus();
try {
String greeting = untypedWorkflowStub.getResult(String.class);
printWorkflowStatus();
System.out.println("Greeting: " + greeting);
} catch(WorkflowFailedException e) {
System.out.println("Workflow failed: " + e.getCause().getMessage());
printWorkflowStatus();
}
}
...
The following example shows the Workflow interface for the GreetingWorkflow
called in the previous example.
...
@WorkflowInterface
public interface GreetingWorkflow {
@WorkflowMethod
String greet(Customer customer);
@SignalMethod
void setCustomer(Customer customer);
@QueryMethod
Customer getCustomer();
...
}
Note that the Signal handler setCustomer
is executed before the @WorkflowMethod
greet
is called.
In cases where you may not know if a Workflow is running, and want to send a Signal to it, use startwithSignal
.
If a running Workflow exists, the startwithSignal
API sends the Signal.
If there is no running Workflow, the API starts a new Workflow Run and delivers the Signal to it.
$workflow = $workflowClient->newWorkflowStub(YourWorkflow::class);
$run = $workflowClient->startWithSignal(
$workflow,
'setValue',
[true], // signal arguments
[] // start arguments
);
To send a Signal-With-Start in Python, use the start_workflow()
method and pass the start_signal
argument with the name of your Signal, instead of using a traditional Workflow start.
async def main():
client = await Client.connect("localhost:7233", namespace="your-namespace")
handle = await client.start_workflow(
"your-workflow-name",
"some arg",
id="your-workflow-id",
task_queue="your-task-queue",
start_signal="your-signal-name",
)
WorkflowClient.signalWithStart
import { WorkflowClient } from '@temporalio/client';
import { joinSignal, yourWorkflow } from './workflows';
const client = new WorkflowClient();
await client.signalWithStart(yourWorkflow, {
workflowId: 'workflow-id-123',
args: [{ foo: 1 }],
signal: joinSignal,
signalArgs: [{ userId: 'user-1', groupId: 'group-1' }],
});
Queries
A QueryWhat is a Query?
A Query is a synchronous operation that is used to report the state of a Workflow Execution.
Learn more is a synchronous operation that is used to get the state of a Workflow Execution.
Define Query
A Query has a name and can have arguments.
- The name, also called a Query type, is a string.
- The arguments must be serializableWhat is a Data Converter?
A Data Converter is a Temporal SDK component that encodes and decodes data entering and exiting a Temporal Server.
Learn more.
- Go
- Java
- PHP
- Python
- TypeScript
In Go, a Query type, also called a Query name, is a string
value.
queryType := "your_query_name"
To define a Query, define the method name and the result type of the Query.
query(String queryType, Class<R> resultClass, Type resultType, Object... args);
/* @param queryType name of the Query handler. Usually it is a method name.
* @param resultClass class of the Query result type
* @param args optional Query arguments
* @param <R> type of the Query result
*/
Query methods can take in any number of input parameters which can be used to limit the data that is returned.
Use the Query method names to send and receive Queries.
Query methods must never change any Workflow state including starting Activities or blocking threads in any way.
Workflows can answer synchronous QueriesHow to develop with Queries
A Query is a synchronous operation that is used to get the state of a Workflow Execution.
Learn more and receive SignalsHow to develop with Signals
A Signal is a message sent to a running Workflow Execution
Learn more.
All interface methods must have one of the following annotations:
- #[WorkflowMethod] indicates an entry point to a Workflow.
It contains parameters that specify timeouts and a Task Queue name.
Required parameters (such as
executionStartToCloseTimeoutSeconds
) that are not specified through the annotation must be provided at runtime. - #[SignalMethod] indicates a method that reacts to external signals. It must have a
void
return type. - #[QueryMethod] indicates a method that reacts to synchronous query requests. It must have a non
void
return type.
It is possible (though not recommended for usability reasons) to annotate concrete class implementation.
You can have more than one method with the same annotation (except #[WorkflowMethod]).
For example:
use Temporal\Workflow\WorkflowInterface;
use Temporal\Workflow\WorkflowMethod;
use Temporal\Workflow\SignalMethod;
use Temporal\Workflow\QueryMethod;
#[WorkflowInterface]
interface FileProcessingWorkflow
{
#[WorkflowMethod]
public function processFile(Argument $args);
#[QueryMethod("history")]
public function getHistory(): array;
#[QueryMethod("status")]
public function getStatus(): string;
#[SignalMethod]
public function retryNow(): void;
#[SignalMethod]
public function abandon(): void;
}
Note that name parameter of Workflow method annotations can be used to specify name of Workflow, Signal and Query types. If name is not specified the short name of the Workflow interface is used.
In the above code the #[WorkflowMethod(name)]
is not specified, thus the Workflow Type defaults to "FileProcessingWorkflow"
.
To define a Query, set the Query decorator @workflow.query
on the Query function inside your Workflow.
@workflow.query
async def current_greeting(self) -> str:
return self._current_greeting
The @workflow.query
decorator defines a method as a Query. Queries can be asynchronous or synchronous methods and can be inherited; however, if a method is overridden, the override must also be decorated. Queries should return a value.
Dynamic Queries
You can use @workflow.query(dynamic=True)
, which means all other unhandled Queries fall through to this.
@workflow.query(dynamic=True)
def query_dynamic(self, name: str, *args: Any) -> str:
return f"query_dynamic {name}: {args[0]}"
Customize names
You can have a name parameter to customize the Query's name, otherwise it defaults to the unqualified method __name__
.
The following example sets a custom Query name.
@workflow.query(name="Custom-Name")
def query(self, arg: str) -> None:
self._last_event = f"query: {arg}"
You can either set the name
or the dynamic
parameter in a Query's decorator, but not both.
Use defineQuery
to define the name, parameters, and return value of a Query.
- TypeScript
- JavaScript
import { defineQuery } from '@temporalio/workflow';
export const getValueQuery = defineQuery<number | undefined, [string]>(
'getValue',
);
import { defineQuery } from '@temporalio/workflow';
export const getValueQuery = defineQuery('getValue');
Handle Query
Queries are handled by your Workflow.
Don’t include any logic that causes CommandWhat is a Command?
A Command is a requested action issued by a Worker to the Temporal Cluster after a Workflow Task Execution completes.
Learn more generation within a Query handler (such as executing Activities).
Including such logic causes unexpected behavior.
- Go
- Java
- PHP
- Python
- TypeScript
Use the SetQueryHandler
API from the go.temporal.io/sdk/workflow
package to set a Query Handler that listens for a Query by name.
The handler must be a function that returns two values:
- A serializable result
- An error
The handler function can receive any number of input parameters, but all input parameters must be serializable.
The following sample code sets up a Query Handler that handles the current_state
Query type:
func YourWorkflow(ctx workflow.Context, input string) error {
currentState := "started" // This could be any serializable struct.
queryType := "current_state"
err := workflow.SetQueryHandler(ctx, queryType, func() (string, error) {
return currentState, nil
})
if err != nil {
currentState = "failed to register query handler"
return err
}
// Your normal Workflow code begins here, and you update the currentState as the code makes progress.
currentState = "waiting timer"
err = NewTimer(ctx, time.Hour).Get(ctx, nil)
if err != nil {
currentState = "timer failed"
return err
}
currentState = "waiting activity"
ctx = WithActivityOptions(ctx, yourActivityOptions)
err = ExecuteActivity(ctx, YourActivity, "your_input").Get(ctx, nil)
if err != nil {
currentState = "activity failed"
return err
}
currentState = "done"
return nil
}
For example, suppose your query handler function takes two parameters:
err := workflow.SetQueryHandler(ctx, "current_state", func(prefix string, suffix string) (string, error) {
return prefix + currentState + suffix, nil
})
To handle a Query in the Workflow, create a Query handler using the @QueryMethod
annotation in the Workflow interface and define it in the Workflow implementation.
The @QueryMethod
annotation indicates that the method is used to handle a Query that is sent to the Workflow Execution.
The method can have parameters that can be used to filter data that the Query returns.
Because the method returns a value, it must have a return type that is not void
.
The Query name defaults to the name of the method.
In the following example, the Query name defaults to getStatus
.
@WorkflowInterface
public interface FileProcessingWorkflow {
@QueryMethod
String getStatus();
}
To overwrite this default naming and assign a custom Query name, use the @QueryMethod
annotation with the name
parameter. In the following example, the Query name is set to "history".
@WorkflowInterface
public interface FileProcessingWorkflow {
@QueryMethod(name = "history")
String getStatus();
}
A Workflow Definition interface can define multiple methods annotated with @QueryMethod
, but the method names or the name
parameters for each must be unique.
The following Workflow interface has a Query method getCount()
to handle Queries to this Workflow.
@WorkflowInterface
public interface HelloWorld {
@WorkflowMethod
void sayHello(String name);
@QueryMethod
int getCount();
}
The following example is the Workflow implementation with the Query method defined in the HelloWorld
Workflow interface from the previous example.
public static class HelloWorldImpl implements HelloWorld {
private String greeting = "Hello";
private int count = 0;
@Override
public void sayHello(String name) {
while (!"Bye".equals(greeting)) {
logger.info(++count + ": " + greeting + " " + name + "!");
String oldGreeting = greeting;
Workflow.await(() -> !Objects.equals(greeting, oldGreeting));
}
logger.info(++count + ": " + greeting + " " + name + "!");
}
@Override
public int getCount() {
return count;
}
}
Dynamic Query Handler You can also implement Query handlers dynamically. This is useful for library-level code and implementation of DSLs.
Use Workflow.registerListener(Object)
to register an implementation of the DynamicQueryListener
in the Workflow implementation code.
Workflow.registerListener(
(DynamicQueryHandler)
(queryName, encodedArgs) -> name = encodedArgs.get(0, String.class));
When registered, any Queries sent to the Workflow without a defined handler will be delivered to the DynamicQueryHandler
.
Note that you can only register one Workflow.registerListener(Object)
per Workflow Execution.
DynamicQueryHandler
can be implemented in both regular and dynamic Workflow implementations.
You can add custom Query types to handle Queries such as Querying the current state of a
Workflow, or Querying how many Activities the Workflow has completed. To do this, you need to set
up a Query handler using method attribute QueryMethod
or Workflow::registerQueryHandler
.
#[Workflow\WorkflowInterface]
class YourWorkflow
{
#[Workflow\QueryMethod]
public function getValue()
{
return 42;
}
#[Workflow\WorkflowMethod]
public function run()
{
// workflow code
}
}
The handler function can receive any number of input parameters, but all input parameters must be
serializable. The following sample code sets up a Query handler that handles the Query type of
currentState
:
#[Workflow\WorkflowInterface]
class YourWorkflow
{
private string $currentState;
#[Workflow\QueryMethod('current_state')]
public function getCurrentState(): string
{
return $this->currentState;
}
#[Workflow\WorkflowMethod]
public function run()
{
// Your normal Workflow code begins here, and you update the currentState
// as the code makes progress.
$this->currentState = 'waiting timer';
try{
yield Workflow::timer(DateInterval::createFromDateString('1 hour'));
} catch (\Throwable $e) {
$this->currentState = 'timer failed';
throw $e;
}
$yourActivity = Workflow::newActivityStub(
YourActivityInterface::class,
ActivityOptions::new()->withScheduleToStartTimeout(60)
);
$this->currentState = 'waiting activity';
try{
yield $yourActivity->doSomething('some input');
} catch (\Throwable $e) {
$this->currentState = 'activity failed';
throw $e;
}
$this->currentState = 'done';
return null;
}
}
You can also issue a Query from code using the QueryWorkflow()
API on a Temporal Client object.
Use WorkflowStub
to Query Workflow instances from your Client code (can be applied to both running and closed Workflows):
$workflow = $workflowClient->newWorkflowStub(
YourWorkflow::class,
WorkflowOptions::new()
);
$workflowClient->start($workflow);
var_dump($workflow->getCurrentState());
sleep(60);
var_dump($workflow->getCurrentState());
To send a Query to the Workflow, use the query
method from the WorkflowHandle
class.
await handle.query("some query")
Use handleQuery
to handle Queries inside a Workflow.
You make a Query with handle.query(query, ...args)
. A Query needs a return value, but can also take arguments.
- TypeScript
- JavaScript
export async function trackState(): Promise<void> {
const state = new Map<string, number>();
setHandler(setValueSignal, (key, value) => void state.set(key, value));
setHandler(getValueQuery, (key) => state.get(key));
await CancellationScope.current().cancelRequested;
}
export async function trackState() {
const state = new Map();
setHandler(setValueSignal, (key, value) => void state.set(key, value));
setHandler(getValueQuery, (key) => state.get(key));
await CancellationScope.current().cancelRequested;
}
Send Query
Queries are sent from a Temporal Client.
- Go
- Java
- PHP
- Python
- TypeScript
Use the QueryWorkflow()
API or the QueryWorkflowWithOptions
API on the Temporal Client to send a Query to a Workflow Execution.
// ...
response, err := temporalClient.QueryWorkflow(context.Background(), workflowID, runID, queryType)
if err != nil {
// ...
}
// ...
You can pass an arbitrary number of arguments to the QueryWorkflow()
function.
// ...
response, err := temporalClient.QueryWorkflow(context.Background(), workflowID, runID, queryType, "foo", "baz")
if err != nil {
// ...
}
// ...
The QueryWorkflowWithOptions()
API provides similar functionality, but with the ability to set additional configurations through QueryWorkflowWithOptionsRequest.
When using this API, you will also receive a structured response of type QueryWorkflowWithOptionsResponse.
// ...
response, err := temporalClient.QueryWorkflowWithOptions(context.Background(), &client.QueryWorkflowWithOptionsRequest{
WorkflowID: workflowID,
RunID: runID,
QueryType: queryType,
Args: args,
})
if err != nil {
// ...
}
To send a Query to a Workflow Execution from an external process, call the Query method (defined in the Workflow) from a WorkflowStub
within the Client code.
For example, the following Client code calls a Query method queryGreeting()
defined in the GreetingWorkflow
Workflow interface.
// Create our workflow options
WorkflowOptions workflowOptions =
WorkflowOptions.newBuilder()
.setWorkflowId(WORKFLOW_ID)
.setTaskQueue(TASK_QUEUE).build();
// Create the Temporal client stub. It is used to start our workflow execution.
GreetingWorkflow workflow = client.newWorkflowStub(GreetingWorkflow.class, workflowOptions);
// Start our workflow asynchronously to not use another thread to query.
WorkflowClient.start(workflow::createGreeting, "World");
// Query the Workflow to get the current value of greeting and print it.
System.out.println(workflow.queryGreeting());
Content is planned but not yet available.
The information you are looking for may be found in the legacy docs.
To send a Query to a Workflow Execution from Client code, use the query()
method on the Workflow handle.
await my_workflow_handle.query(MyWorkflow.my_query, "my query arg")
Use WorkflowHandle.query
to query a running or completed Workflow.
- TypeScript
- JavaScript
import { Client } from '@temporalio/client';
import { getValueQuery } from './workflows';
async function run(): Promise<void> {
const client = new Client();
const handle = client.workflow.getHandle('state-id-0');
const meaning = await handle.query(getValueQuery, 'meaning-of-life');
console.log({ meaning });
}
import { Client } from '@temporalio/client';
import { getValueQuery } from './workflows';
async function run() {
const client = new Client();
const handle = client.workflow.getHandle('state-id-0');
const meaning = await handle.query(getValueQuery, 'meaning-of-life');
console.log({ meaning });
}
Workflow timeouts
Each Workflow timeout controls the maximum duration of a different aspect of a Workflow Execution.
Workflow timeouts are set when starting the Workflow ExecutionWorkflow timeouts
Each Workflow timeout controls the maximum duration of a different aspect of a Workflow Execution.
Learn more.
- Workflow Execution TimeoutWhat is a Workflow Execution Timeout?
A Workflow Execution Timeout is the maximum time that a Workflow Execution can be executing (have an Open status) including retries and any usage of Continue As New.
Learn more - restricts the maximum amount of time that a single Workflow Execution can be executed. - Workflow Run TimeoutWhat is a Workflow Run Timeout?
This is the maximum amount of time that a single Workflow Run is restricted to.
Learn more: restricts the maximum amount of time that a single Workflow Run can last. - Workflow Task TimeoutWhat is a Workflow Task Timeout?
A Workflow Task Timeout is the maximum amount of time that the Temporal Server will wait for a Worker to start processing a Workflow Task after the Task has been pulled from the Task Queue.
Learn more: restricts the maximum amount of time that a Worker can execute a Workflow Task.
- Go
- Java
- PHP
- Python
- TypeScript
Create an instance of StartWorkflowOptions
from the go.temporal.io/sdk/client
package, set a timeout, and pass the instance to the ExecuteWorkflow
call.
Available timeouts are:
WorkflowExecutionTimeout
WorkflowRunTimeout
WorkflowTaskTimeout
workflowOptions := client.StartWorkflowOptions{
// ...
// Set Workflow Timeout duration
WorkflowExecutionTimeout: time.Hours * 24 * 365 * 10,
// WorkflowRunTimeout: time.Hours * 24 * 365 * 10,
// WorkflowTaskTimeout: time.Second * 10,
// ...
}
workflowRun, err := c.ExecuteWorkflow(context.Background(), workflowOptions, YourWorkflowDefinition)
if err != nil {
// ...
}
Create an instance of WorkflowStub
in the Client code and set your timeout.
Available timeouts are:
//create Workflow stub for YourWorkflowInterface
YourWorkflowInterface workflow1 =
WorkerGreet.greetclient.newWorkflowStub(
GreetWorkflowInterface.class,
WorkflowOptions.newBuilder()
.setWorkflowId("YourWorkflow")
.setTaskQueue(WorkerGreet.TASK_QUEUE)
// Set Workflow Timeout duration
.setWorkflowExecutionTimeout(Duration.ofSeconds(10))
// .setWorkflowRunTimeout(Duration.ofSeconds(10))
// .setWorkflowTaskTimeout(Duration.ofSeconds(10))
.build());
Create an instance of WorkflowOptions
in the Client code and set your timeout.
Available timeouts are:
withWorkflowExecutionTimeout()
withWorkflowRunTimeout()
withWorkflowTaskTimeout()
$workflow = $this->workflowClient->newWorkflowStub(
DynamicSleepWorkflowInterface::class,
WorkflowOptions::new()
->withWorkflowId(DynamicSleepWorkflow::WORKFLOW_ID)
->withWorkflowIdReusePolicy(WorkflowIdReusePolicy::WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE)
// Set Workflow Timeout duration
->withWorkflowExecutionTimeout(CarbonInterval::minutes(2))
// ->withWorkflowRunTimeout(CarbonInterval::minute(2))
// ->withWorkflowTaskTimeout(CarbonInterval::minute(2))
);
Set the timeout from either the start_workflow()
or execute_workflow()
asynchronous methods.
Available timeouts are:
execution_timeout
run_timeout
task_timeout
handle = await client.start_workflow(
"your-workflow-name",
"some arg",
id="your-workflow-id",
task_queue="your-task-queue",
start_signal="your-signal-name",
# Set Workflow Timeout duration
execution_timeout="timedelta(seconds=2)",
# run_timeout="timedelta(seconds=2)",
# task_timeout="timedelta(seconds=2)",
)
handle = await client.execute_workflow(
"your-workflow-name",
"some arg",
id="your-workflow-id",
task_queue="your-task-queue",
start_signal="your-signal-name",
# Set Workflow Timeout duration
execution_timeout="timedelta(seconds=2)",
# run_timeout="timedelta(seconds=2)",
# task_timeout="timedelta(seconds=2)",
)
Create an instance of WorkflowOptions
from the Client and set your Workflow Timeout.
Available timeouts are:
- TypeScript
- JavaScript
await client.workflow.start(example, {
taskQueue,
workflowId,
workflowExecutionTimeout: '1 day',
});
await client.workflow.start(example, {
taskQueue,
workflowId,
workflowExecutionTimeout: '1 day',
});
- TypeScript
- JavaScript
await client.workflow.start(example, {
taskQueue,
workflowId,
workflowRunTimeout: '1 minute',
});
await client.workflow.start(example, {
taskQueue,
workflowId,
workflowRunTimeout: '1 minute',
});
- TypeScript
- JavaScript
await client.workflow.start(example, {
taskQueue,
workflowId,
workflowTaskTimeout: '1 minute',
});
await client.workflow.start(example, {
taskQueue,
workflowId,
workflowTaskTimeout: '1 minute',
});
Workflow retries
A Retry Policy can work in cooperation with the timeouts to provide fine controls to optimize the execution experience.
Use a Retry PolicyWhat is a Retry Policy?
A Retry Policy is a collection of attributes that instructs the Temporal Server how to retry a failure of a Workflow Execution or an Activity Task Execution.
Learn more to retry a Workflow Execution in the event of a failure.
Workflow Executions do not retry by default, and Retry Policies should be used with Workflow Executions only in certain situations.
- Go
- Java
- PHP
- Python
- TypeScript
Create an instance of a RetryPolicy
from the go.temporal.io/sdk/temporal
package and provide it as the value to the RetryPolicy
field of the instance of StartWorkflowOptions
.
- Type:
RetryPolicy
- Default: None
retrypolicy := &temporal.RetryPolicy{
InitialInterval: time.Second,
BackoffCoefficient: 2.0,
MaximumInterval: time.Second * 100,
}
workflowOptions := client.StartWorkflowOptions{
RetryPolicy: retrypolicy,
// ...
}
workflowRun, err := temporalClient.ExecuteWorkflow(context.Background(), workflowOptions, YourWorkflowDefinition)
if err != nil {
// ...
}
To set a Workflow Retry Options in the WorkflowStub
instance use WorkflowOptions.Builder.setWorkflowRetryOptions
.
- Type:
RetryOptions
- Default:
Null
which means no retries will be attempted.
//create Workflow stub for GreetWorkflowInterface
GreetWorkflowInterface workflow1 =
WorkerGreet.greetclient.newWorkflowStub(
GreetWorkflowInterface.class,
WorkflowOptions.newBuilder()
.setWorkflowId("GreetWF")
.setTaskQueue(WorkerGreet.TASK_QUEUE)
// Set Workflow Retry Options
.setRetryOptions(RetryOptions.newBuilder()
.build());
A Retry Policy can be configured with an instance of the RetryOptions
object.
To enable retries for a Workflow, you need to provide a Retry Policy object via ChildWorkflowOptions
for Child Workflows or via WorkflowOptions
for top-level Workflows.
$workflow = $this->workflowClient->newWorkflowStub(
CronWorkflowInterface::class,
WorkflowOptions::new()->withRetryOptions(
RetryOptions::new()->withInitialInterval(120)
)
);
Set the Retry Policy from either the start_workflow()
or execute_workflow()
asynchronous methods.
handle = await client.start_workflow(
"your-workflow-name",
"some arg",
id="your-workflow-id",
task_queue="your-task-queue",
start_signal="your-signal-name",
retry_policy=RetryPolicy(maximum_interval=timedelta(seconds=2)),
)
handle = await client.execute_workflow(
"your-workflow-name",
"some arg",
id="your-workflow-id",
task_queue="your-task-queue",
start_signal="your-signal-name",
retry_policy=RetryPolicy(maximum_interval=timedelta(seconds=2)),
)
Create an instance of the Retry Policy, known as retry
in TypeScript, from the WorkflowOptions
of the Client interface.
- TypeScript
- JavaScript
const handle = await client.workflow.start(example, {
taskQueue,
workflowId,
retry: {
maximumAttempts: 3,
},
});
const handle = await client.workflow.start(example, {
taskQueue,
workflowId,
retry: {
maximumAttempts: 3,
},
});
Activity timeouts
Each Activity timeout controls the maximum duration of a different aspect of an Activity Execution.
The following timeouts are available in the Activity Options.
- Schedule-To-Close TimeoutWhat is a Schedule-To-Close Timeout?
A Schedule-To-Close Timeout is the maximum amount of time allowed for the overall Activity Execution, from when the first Activity Task is scheduled to when the last Activity Task, in the chain of Activity Tasks that make up the Activity Execution, reaches a Closed status.
Learn more: is the maximum amount of time allowed for the overall Activity ExecutionWhat is an Activity Execution?
An Activity Execution is the full chain of Activity Task Executions.
Learn more. - Start-To-Close TimeoutWhat is a Start-To-Close Timeout?
A Start-To-Close Timeout is the maximum time allowed for a single Activity Task Execution.
Learn more: is the maximum time allowed for a single Activity Task ExecutionWhat is an Activity Task Execution?
An Activity Task Execution occurs when a Worker uses the context provided from the Activity Task and executes the Activity Definition.
Learn more. - Schedule-To-Start TimeoutWhat is a Schedule-To-Start Timeout?
A Schedule-To-Start Timeout is the maximum amount of time that is allowed from when an Activity Task is placed in a Task Queue to when a Worker picks it up from the Task Queue.
Learn more: is the maximum amount of time that is allowed from when an Activity TaskWhat is an Activity Task?
An Activity Task contains the context needed to make an Activity Task Execution.
Learn more is scheduled to when a WorkerWhat is a Worker?
In day-to-day conversations, the term Worker is used to denote both a Worker Program and a Worker Process. Temporal documentation aims to be explicit and differentiate between them.
Learn more starts that Activity Task.
An Activity Execution must have either the Start-To-Close or the Schedule-To-Close Timeout set.
- Go
- Java
- PHP
- Python
- TypeScript
To set an Activity Timeout in Go, create an instance of ActivityOptions
from the go.temporal.io/sdk/workflow
package, set the Activity Timeout field, and then use the WithActivityOptions()
API to apply the options to the instance of workflow.Context
.
Available timeouts are:
StartToCloseTimeout
ScheduleToClose
ScheduleToStartTimeout
activityoptions := workflow.ActivityOptions{
// Set Activity Timeout duration
ScheduleToCloseTimeout: 10 * time.Second,
// StartToCloseTimeout: 10 * time.Second,
// ScheduleToStartTimeout: 10 * time.Second,
}
ctx = workflow.WithActivityOptions(ctx, activityoptions)
var yourActivityResult YourActivityResult
err = workflow.ExecuteActivity(ctx, YourActivityDefinition, yourActivityParam).Get(ctx, &yourActivityResult)
if err != nil {
// ...
}
Set your Activity Timeout from the ActivityOptions.Builder
class.
Available timeouts are:
- ScheduleToCloseTimeout()
- ScheduleToStartTimeout()
- StartToCloseTimeout()
You can set Activity Options using an ActivityStub
within a Workflow implementation, or per-Activity using WorkflowImplementationOptions
within a Worker.
The following uses ActivityStub
.
GreetingActivities activities = Workflow.newActivityStub(GreetingActivities.class,
ActivityOptions.newBuilder()
.setScheduleToCloseTimeout(Duration.ofSeconds(5))
// .setStartToCloseTimeout(Duration.ofSeconds(2)
// .setScheduletoCloseTimeout(Duration.ofSeconds(20))
.build());
The following uses WorkflowImplementationOptions
.
WorkflowImplementationOptions options =
WorkflowImplementationOptions.newBuilder()
.setActivityOptions(
ImmutableMap.of(
"GetCustomerGreeting",
// Set Activity Execution timeout
ActivityOptions.newBuilder()
.setScheduleToCloseTimeout(Duration.ofSeconds(5))
// .setStartToCloseTimeout(Duration.ofSeconds(2))
// .setScheduleToStartTimeout(Duration.ofSeconds(5))
.build()))
.build();
If you define options per-Activity Type options with WorkflowImplementationOptions.setActivityOptions()
, setting them again specifically with ActivityStub
in a Workflow will override this setting.
Because Activities are reentrant, only a single stub can be used for multiple Activity invocations.
Available timeouts are:
- withScheduleToCloseTimeout()
- withStartToCloseTimeout()
- withScheduleToStartTimeout()
$this->greetingActivity = Workflow::newActivityStub(
GreetingActivityInterface::class,
// Set Activity Timeout duration
ActivityOptions::new()
->withScheduleToCloseTimeout(CarbonInterval::seconds(2))
// ->withStartToCloseTimeout(CarbonInterval::seconds(2))
// ->withScheduleToStartTimeout(CarbonInterval::seconds(10))
);
Activity options are set as keyword arguments after the Activity arguments.
Available timeouts are:
- schedule_to_close_timeout
- schedule_to_start_timeout
- start_to_close_timeout
@workflow.defn
class YourWorkflow:
@workflow.run
async def run(self, name: str) -> str:
return await workflow.execute_activity(
your_activity,
name,
schedule_to_close_timeout=timedelta(seconds=5),
# schedule_to_start_timeout=timedelta(seconds=5),
# start_to_close_timeout=timedelta(seconds=5),
)
When you call proxyActivities
in a Workflow Function, you can set a range of ActivityOptions
.
Available timeouts are:
// Sample of typical options you can set
const { greet } = proxyActivities<typeof activities>({
scheduleToCloseTimeout: '5m',
// startToCloseTimeout: "30s", // recommended
// scheduleToStartTimeout: "60s",
retry: {
// default retry policy if not specified
initialInterval: '1s',
backoffCoefficient: 2,
maximumAttempts: Infinity,
maximumInterval: 100 * initialInterval,
nonRetryableErrorTypes: [],
},
});
Activity retries
A Retry Policy works in cooperation with the timeouts to provide fine controls to optimize the execution experience.
Activity Executions are automatically associated with a default Retry PolicyWhat is a Retry Policy?
A Retry Policy is a collection of attributes that instructs the Temporal Server how to retry a failure of a Workflow Execution or an Activity Task Execution.
Learn more if a custom one is not provided.
- Go
- Java
- PHP
- Python
- TypeScript
To set a RetryPolicyWhat is a Retry Policy?
A Retry Policy is a collection of attributes that instructs the Temporal Server how to retry a failure of a Workflow Execution or an Activity Task Execution.
Learn more, create an instance of ActivityOptions
from the go.temporal.io/sdk/workflow
package, set the RetryPolicy
field, and then use the WithActivityOptions()
API to apply the options to the instance of workflow.Context
.
- Type:
RetryPolicy
- Default:
retrypolicy := &temporal.RetryPolicy{
InitialInterval: time.Second,
BackoffCoefficient: 2.0,
MaximumInterval: time.Second * 100, // 100 * InitialInterval
MaximumAttempts: 0, // Unlimited
NonRetryableErrorTypes: []string, // empty
}
Providing a Retry Policy here is a customization, and overwrites individual Field defaults.
retrypolicy := &temporal.RetryPolicy{
InitialInterval: time.Second,
BackoffCoefficient: 2.0,
MaximumInterval: time.Second * 100,
}
activityoptions := workflow.ActivityOptions{
RetryPolicy: retrypolicy,
}
ctx = workflow.WithActivityOptions(ctx, activityoptions)
var yourActivityResult YourActivityResult
err = workflow.ExecuteActivity(ctx, YourActivityDefinition, yourActivityParam).Get(ctx, &yourActivityResult)
if err != nil {
// ...
}
To set a Retry Policy, known as the Retry OptionsWhat is a Retry Policy?
A Retry Policy is a collection of attributes that instructs the Temporal Server how to retry a failure of a Workflow Execution or an Activity Task Execution.
Learn more in Java, use ActivityOptions.newBuilder.setRetryOptions()
.
Type:
RetryOptions
Default: Server-defined Activity Retry policy.
With
ActivityStub
private final ActivityOptions options =
ActivityOptions.newBuilder()
// note that either StartToCloseTimeout or ScheduleToCloseTimeout are
// required when setting Activity options.
.setStartToCloseTimeout(Duration.ofSeconds(5))
.setRetryOptions(
RetryOptions.newBuilder()
.setInitialInterval(Duration.ofSeconds(1))
.setMaximumInterval(Duration.ofSeconds(10))
.build())
.build();With
WorkflowImplementationOptions
WorkflowImplementationOptions options =
WorkflowImplementationOptions.newBuilder()
.setActivityOptions(
ImmutableMap.of(
"EmailCustomerGreeting",
ActivityOptions.newBuilder()
// note that either StartToCloseTimeout or ScheduleToCloseTimeout are
// required when setting Activity options.
.setStartToCloseTimeout(Duration.ofSeconds(5))
.setRetryOptions(
RetryOptions.newBuilder()
.setDoNotRetry(NullPointerException.class.getName())
.build())
.build()))
.build();
To set an Activity Retry, set {@link RetryOptions}
on {@link ActivityOptions}
.
The follow example creates a new Activity with the given options.
$this->greetingActivity = Workflow::newActivityStub(
GreetingActivityInterface::class,
ActivityOptions::new()
->withScheduleToCloseTimeout(CarbonInterval::seconds(10))
->withRetryOptions(
RetryOptions::new()
->withInitialInterval(CarbonInterval::seconds(1))
->withMaximumAttempts(5)
->withNonRetryableExceptions([\InvalidArgumentException::class])
)
);
}
For an executable code sample, see ActivityRetry sample in the PHP samples repository.
To create an Activity Retry Policy in Python, set the RetryPolicy class within the start_activity()
or execute_activity()
function.
The following example sets the maximum interval to 2 seconds.
workflow.execute_activity(
your_activity,
name,
start_to_close_timeout=timedelta(seconds=10),
retry_policy=RetryPolicy(maximum_interval=timedelta(seconds=2)),
)
To set Activity Retry Policies in TypeScript, pass ActivityOptions.retry
to proxyActivities
.
// Sample of typical options you can set
const { yourActivity } = proxyActivities<typeof activities>({
// ...
retry: {
// default retry policy if not specified
initialInterval: '1s',
backoffCoefficient: 2,
maximumAttempts: Infinity,
maximumInterval: 100 * initialInterval,
nonRetryableErrorTypes: [],
},
});
Activity retry simulator
Use this tool to visualize total Activity Execution times and experiment with different Activity timeouts and Retry Policies.
The simulator is based on a common Activity use-case, which is to call a third party HTTP API and return the results. See the example code snippets below.
Use the Activity Retries settings to configure how long the API request takes to succeed or fail. There is an option to generate scenarios. The Task Time in Queue simulates the time the Activity Task might be waiting in the Task Queue.
Use the Activity Timeouts and Retry Policy settings to see how they impact the success or failure of an Activity Execution.
Sample Activity
import axios from 'axios';
async function testActivity(url: string): Promise<void> {
await axios.get(url);
}
export default testActivity;
Activity Retries (in ms)
Activity Timeouts (in ms)
Retry Policy (in ms)
Success after 1 ms
{
"startToCloseTimeout": 10000,
"retryPolicy": {
"backoffCoefficient": 2,
"initialInterval": 1000
}
}
Activity Heartbeats
An Activity HeartbeatWhat is an Activity Heartbeat?
An Activity Heartbeat is a ping from the Worker that is executing the Activity to the Temporal Cluster. Each ping informs the Temporal Cluster that the Activity Execution is making progress and the Worker has not crashed.
Learn more is a ping from the Worker ProcessWhat is a Worker Process?
A Worker Process is responsible for polling a Task Queue, dequeueing a Task, executing your code in response to a Task, and responding to the Temporal Server with the results.
Learn more that is executing the Activity to the Temporal ClusterWhat is a Temporal Cluster?
A Temporal Cluster is the Temporal Server paired with persistence.
Learn more.
Each Heartbeat informs the Temporal Cluster that the Activity ExecutionWhat is an Activity Execution?
An Activity Execution is the full chain of Activity Task Executions.
Learn more is making progress and the Worker has not crashed.
If the Cluster does not receive a Heartbeat within a Heartbeat TimeoutWhat is a Heartbeat Timeout?
A Heartbeat Timeout is the maximum time between Activity Heartbeats.
Learn more time period, the Activity will be considered failed and another Activity Task ExecutionWhat is an Activity Task Execution?
An Activity Task Execution occurs when a Worker uses the context provided from the Activity Task and executes the Activity Definition.
Learn more may be scheduled according to the Retry Policy.
Heartbeats may not always be sent to the Cluster—they may be throttledWhat is an Activity Heartbeat?
An Activity Heartbeat is a ping from the Worker that is executing the Activity to the Temporal Cluster. Each ping informs the Temporal Cluster that the Activity Execution is making progress and the Worker has not crashed.
Learn more by the Worker.
Activity Cancellations are delivered to Activities from the Cluster when they Heartbeat. Activities that don't Heartbeat can't receive a Cancellation. Heartbeat throttling may lead to Cancellation getting delivered later than expected.
Heartbeats can contain a details
field describing the Activity's current progress.
If an Activity gets retried, the Activity can access the details
from the last Heartbeat that was sent to the Cluster.
- Go
- Java
- PHP
- Python
- TypeScript
To HeartbeatWhat is an Activity Heartbeat?
An Activity Heartbeat is a ping from the Worker that is executing the Activity to the Temporal Cluster. Each ping informs the Temporal Cluster that the Activity Execution is making progress and the Worker has not crashed.
Learn more in an Activity in Go, use the RecordHeartbeat
API.
import (
// ...
"go.temporal.io/sdk/workflow"
// ...
)
func YourActivityDefinition(ctx, YourActivityDefinitionParam) (YourActivityDefinitionResult, error) {
// ...
activity.RecordHeartbeat(ctx, details)
// ...
}
When an Activity Task Execution times out due to a missed Heartbeat, the last value of the details
variable above is returned to the calling Workflow in the details
field of TimeoutError
with TimeoutType
set to Heartbeat
.
You can also Heartbeat an Activity from an external source:
// The client is a heavyweight object that should be created once per process.
temporalClient, err := client.Dial(client.Options{})
// Record heartbeat.
err := temporalClient.RecordActivityHeartbeat(ctx, taskToken, details)
The parameters of the RecordActivityHeartbeat
function are:
taskToken
: The value of the binaryTaskToken
field of theActivityInfo
struct retrieved inside the Activity.details
: The serializable payload containing progress information.
If an Activity Execution Heartbeats its progress before it failed, the retry attempt will have access to the progress information, so that the Activity Execution can resume from the failed state. Here's an example of how this can be implemented:
func SampleActivity(ctx context.Context, inputArg InputParams) error {
startIdx := inputArg.StartIndex
if activity.HasHeartbeatDetails(ctx) {
// Recover from finished progress.
var finishedIndex int
if err := activity.GetHeartbeatDetails(ctx, &finishedIndex); err == nil {
startIdx = finishedIndex + 1 // Start from next one.
}
}
// Normal Activity logic...
for i:=startIdx; i<inputArg.EndIdx; i++ {
// Code for processing item i goes here...
activity.RecordHeartbeat(ctx, i) // Report progress.
}
}
To Heartbeat an Activity Execution in Java, use the Activity.getExecutionContext().heartbeat()
Class method.
public class YourActivityDefinitionImpl implements YourActivityDefinition {
@Override
public String yourActivityMethod(YourActivityMethodParam param) {
// ...
Activity.getExecutionContext().heartbeat(details);
// ...
}
// ...
}
The method takes an optional argument, the details
variable above that represents latest progress of the Activity Execution.
This method can take a variety of types such as an exception object, custom object, or string.
If the Activity Execution times out, the last Heartbeat details
are included in the thrown ActivityTimeoutException
, which can be caught by the calling Workflow.
The Workflow can then use the details
information to pass to the next Activity invocation if needed.
In the case of Activity retries, the last Heartbeat's details
are available and can be extracted from the last failed attempt by using Activity.getExecutionContext().getHeartbeatDetails(Class<V> detailsClass)
Some Activities are long-running.
To react to a crash quickly, use the Heartbeat mechanism, Activity::heartbeat()
, which lets the Temporal Server know that the Activity is still alive.
This acts as a periodic checkpoint mechanism for the progress of an Activity.
You can piggyback details
on an Activity Heartbeat.
If an Activity times out, the last value of details
is included in the TimeoutFailure
delivered to a Workflow.
Then the Workflow can pass the details to the next Activity invocation.
Additionally, you can access the details from within an Activity via Activity::getHeartbeatDetails
.
When an Activity is retried after a failure getHeartbeatDetails
enables you to get the value from the last successful Heartbeat.
use Temporal\Activity;
class FileProcessingActivitiesImpl implements FileProcessingActivities
{
// ...
public function download(
string $bucketName,
string $remoteName,
string $localName
): void
{
$this->dowloader->downloadWithProgress(
$bucketName,
$remoteName,
$localName,
// on progress
function ($progress) {
Activity::heartbeat($progress);
}
);
Activity::heartbeat(100); // download complete
// ...
}
// ...
}
To Heartbeat an Activity Execution in Python, use the heartbeat()
API.
@activity.defn
async def your_activity_definition() -> str:
activity.heartbeat("heartbeat details!")
In addition to obtaining cancellation information, Heartbeats also support detail data that persists on the server for retrieval during Activity retry.
If an Activity calls heartbeat(123, 456)
and then fails and is retried, heartbeat_details
returns an iterable containing 123
and 456
on the next Run.
Long-running Activities should Heartbeat their progress back to the Workflow for earlier detection of stalled Activities (with Heartbeat TimeoutWhat is a Heartbeat Timeout?
A Heartbeat Timeout is the maximum time between Activity Heartbeats.
Learn more) and resuming stalled Activities from checkpoints (with Heartbeat details).
To set Activity Heartbeat, use Context.current().heartbeat()
in your Activity implementation, and set heartbeatTimeout
in your Workflow.
- TypeScript
- JavaScript
// activity implementation
export async function example(sleepIntervalMs = 1000): Promise<void> {
for (let progress = 1; progress <= 1000; ++progress) {
await Context.current().sleep(sleepIntervalMs);
// record activity heartbeat
Context.current().heartbeat();
}
}
// ...
// workflow code calling activity
const { example } = proxyActivities<typeof activities>({
startToCloseTimeout: '1 hour',
heartbeatTimeout: '10s',
});
// activity implementation
export async function example(sleepIntervalMs = 1000) {
for (let progress = 1; progress <= 1000; ++progress) {
await Context.current().sleep(sleepIntervalMs);
// record activity heartbeat
Context.current().heartbeat();
}
}
// ...
// workflow code calling activity
const { example } = proxyActivities({
startToCloseTimeout: '1 hour',
heartbeatTimeout: '10s',
});
In the previous example, setting the Heartbeat informs the Temporal Server of the Activity's progress at regular intervals.
If the Activity stalls or the Activity Worker becomes unavailable, the absence of Heartbeats prompts the Temporal Server to retry the Activity immediately, without waiting for startToCloseTimeout
to complete.
You can also add heartbeatDetails
as a checkpoint to collect data about failures during the execution, and use it to resume the Activity from that point.
The following example extends the previous sample to include a heartbeatDetails
checkpoint.
- TypeScript
- JavaScript
export async function example(sleepIntervalMs = 1000): Promise<void> {
const startingPoint = Context.current().info.heartbeatDetails || 1; // allow for resuming from heartbeat
for (let progress = startingPoint; progress <= 100; ++progress) {
await Context.current().sleep(sleepIntervalMs);
Context.current().heartbeat(progress);
}
}
export async function example(sleepIntervalMs = 1000) {
const startingPoint = Context.current().info.heartbeatDetails || 1; // allow for resuming from heartbeat
for (let progress = startingPoint; progress <= 100; ++progress) {
await Context.current().sleep(sleepIntervalMs);
Context.current().heartbeat(progress);
}
}
In this example, when the heartbeatTimeout
is reached and the Activity is retried, the Activity Worker picks up the execution from where the previous attempt left off.
Heartbeat Timeout
A Heartbeat TimeoutWhat is a Heartbeat Timeout?
A Heartbeat Timeout is the maximum time between Activity Heartbeats.
Learn more works in conjunction with Activity HeartbeatsWhat is an Activity Heartbeat?
An Activity Heartbeat is a ping from the Worker that is executing the Activity to the Temporal Cluster. Each ping informs the Temporal Cluster that the Activity Execution is making progress and the Worker has not crashed.
Learn more.
- Go
- Java
- PHP
- Python
- TypeScript
To set a Heartbeat TimeoutWhat is a Heartbeat Timeout?
A Heartbeat Timeout is the maximum time between Activity Heartbeats.
Learn more, Create an instance of ActivityOptions
from the go.temporal.io/sdk/workflow
package, set the RetryPolicy
field, and then use the WithActivityOptions()
API to apply the options to the instance of workflow.Context
.
activityoptions := workflow.ActivityOptions{
HeartbeatTimeout: 10 * time.Second,
}
ctx = workflow.WithActivityOptions(ctx, activityoptions)
var yourActivityResult YourActivityResult
err = workflow.ExecuteActivity(ctx, YourActivityDefinition, yourActivityParam).Get(ctx, &yourActivityResult)
if err != nil {
// ...
}
To set a Heartbeat TimeoutWhat is a Heartbeat Timeout?
A Heartbeat Timeout is the maximum time between Activity Heartbeats.
Learn more, use ActivityOptions.newBuilder.setHeartbeatTimeout
.
- Type:
Duration
- Default: None
You can set Activity Options using an ActivityStub
within a Workflow implementation, or per-Activity using WorkflowImplementationOptions
within a Worker.
Note that if you define options per-Activity Type options with WorkflowImplementationOptions.setActivityOptions()
, setting them again specifically with ActivityStub
in a Workflow will override this setting.
With
ActivityStub
private final GreetingActivities activities =
Workflow.newActivityStub(
GreetingActivities.class,
ActivityOptions.newBuilder()
// note that either StartToCloseTimeout or ScheduleToCloseTimeout are
// required when setting Activity options.
.setStartToCloseTimeout(Duration.ofSeconds(5))
.setHeartbeatTimeout(Duration.ofSeconds(2))
.build());With
WorkflowImplementationOptions
WorkflowImplementationOptions options =
WorkflowImplementationOptions.newBuilder()
.setActivityOptions(
ImmutableMap.of(
"EmailCustomerGreeting",
ActivityOptions.newBuilder()
// note that either StartToCloseTimeout or ScheduleToCloseTimeout are
// required when setting Activity options.
.setStartToCloseTimeout(Duration.ofSeconds(5))
.setHeartbeatTimeout(Duration.ofSeconds(2))
.build()))
.build();
Some Activities are long-running.
To react to a crash quickly, use the Heartbeat mechanism, Activity::heartbeat()
, which lets the Temporal Server know that the Activity is still alive.
This acts as a periodic checkpoint mechanism for the progress of an Activity.
You can piggyback details
on an Activity Heartbeat.
If an Activity times out, the last value of details
is included in the TimeoutFailure
delivered to a Workflow.
Then the Workflow can pass the details to the next Activity invocation.
Additionally, you can access the details from within an Activity via Activity::getHeartbeatDetails
.
When an Activity is retried after a failure getHeartbeatDetails
enables you to get the value from the last successful Heartbeat.
use Temporal\Activity;
class FileProcessingActivitiesImpl implements FileProcessingActivities
{
// ...
public function download(
string $bucketName,
string $remoteName,
string $localName
): void
{
$this->dowloader->downloadWithProgress(
$bucketName,
$remoteName,
$localName,
// on progress
function ($progress) {
Activity::heartbeat($progress);
}
);
Activity::heartbeat(100); // download complete
// ...
}
// ...
}
heartbeat_timeout
is a class variable for the start_activity()
function used to set the maximum time between Activity Heartbeats.
workflow.start_activity(
activity="your-activity",
schedule_to_close_timeout=timedelta(seconds=5),
heartbeat_timeout=timedelta(seconds=1),
)
execute_activity()
is a shortcut for start_activity()
that waits on its result.
To get just the handle to wait and cancel separately, use start_activity()
. execute_activity()
should be used in most cases unless advanced task capabilities are needed.
workflow.execute_activity(
activity="your-activity",
name,
schedule_to_close_timeout=timedelta(seconds=5),
heartbeat_timeout=timedelta(seconds=1),
)
To set a Heartbeat Timeout, use ActivityOptions.heartbeatTimeout
. If the Activity takes longer than that between heartbeats, the Activity is failed.
// Creating a proxy for the activity.
const { longRunningActivity } = proxyActivities<typeof activities>({
// translates to 300000 ms
scheduleToCloseTimeout: '5m',
// translates to 30000 ms
startToCloseTimeout: '30s',
// equivalent to '10 seconds'
heartbeatTimeout: 10000,
});
Asynchronous Activity Completion
Asynchronous Activity CompletionWhat is Asynchronous Activity Completion?Asynchronous Activity Completion occurs when an external system provides the final result of a computation, started by an Activity, to the Temporal System.
Learn more enables the Activity Function to return without the Activity Execution completing.
There are three steps to follow:
- The Activity provides the external system with identifying information needed to complete the Activity Execution.
Identifying information can be a Task TokenWhat is a Task Token?
A Task Token is a unique Id that correlates to an Activity Execution.
Learn more, or a combination of Namespace, Workflow Id, and Activity Id. - The Activity Function completes in a way that identifies it as waiting to be completed by an external system.
- The Temporal Client is used to Heartbeat and complete the Activity.
- Go
- Java
- PHP
- Python
- TypeScript
- Provide the external system with a Task Token to complete the Activity Execution.
To do this, use the
GetInfo()
API from thego.temporal.io/sdk/activity
package.
// Retrieve the Activity information needed to asynchronously complete the Activity.
activityInfo := activity.GetInfo(ctx)
taskToken := activityInfo.TaskToken
// Send the taskToken to the external service that will complete the Activity.
- Return an
activity.ErrResultPending
error to indicate that the Activity is completing asynchronously.
return "", activity.ErrResultPending
- Use the Temporal Client to complete the Activity using the Task Token.
// Instantiate a Temporal service client.
// The same client can be used to complete or fail any number of Activities.
// The client is a heavyweight object that should be created once per process.
temporalClient, err := client.Dial(client.Options{})
// Complete the Activity.
temporalClient.CompleteActivity(context.Background(), taskToken, result, nil)
The following are the parameters of the CompleteActivity
function:
taskToken
: The value of the binaryTaskToken
field of theActivityInfo
struct retrieved inside the Activity.result
: The return value to record for the Activity. The type of this value must match the type of the return value declared by the Activity function.err
: The error code to return if the Activity terminates with an error.
If error
is not null, the value of the result
field is ignored.
To fail the Activity, you would do the following:
// Fail the Activity.
client.CompleteActivity(context.Background(), taskToken, nil, err)
To complete an Activity asynchronously, set the ActivityCompletionClient
interface to the complete()
method.
@Override
public String composeGreeting(String greeting, String name) {
// Get the activity execution context
ActivityExecutionContext context = Activity.getExecutionContext();
// Set a correlation token that can be used to complete the activity asynchronously
byte[] taskToken = context.getTaskToken();
/**
* For the example we will use a {@link java.util.concurrent.ForkJoinPool} to execute our
* activity. In real-life applications this could be any service. The composeGreetingAsync
* method is the one that will actually complete workflow action execution.
*/
ForkJoinPool.commonPool().execute(() -> composeGreetingAsync(taskToken, greeting, name));
context.doNotCompleteOnReturn();
// Since we have set doNotCompleteOnReturn(), the workflow action method return value is
// ignored.
return "ignored";
}
// Method that will complete action execution using the defined ActivityCompletionClient
private void composeGreetingAsync(byte[] taskToken, String greeting, String name) {
String result = greeting + " " + name + "!";
// Complete our workflow activity using ActivityCompletionClient
completionClient.complete(taskToken, result);
}
}
Alternatively, set the doNotCompleteOnReturn()
method during an Activity Execution.
@Override
public String composeGreeting(String greeting, String name) {
// Get the activity execution context
ActivityExecutionContext context = Activity.getExecutionContext();
// Set a correlation token that can be used to complete the activity asynchronously
byte[] taskToken = context.getTaskToken();
/**
* For the example we will use a {@link java.util.concurrent.ForkJoinPool} to execute our
* activity. In real-life applications this could be any service. The composeGreetingAsync
* method is the one that will actually complete workflow action execution.
*/
ForkJoinPool.commonPool().execute(() -> composeGreetingAsync(taskToken, greeting, name));
context.doNotCompleteOnReturn();
// Since we have set doNotCompleteOnReturn(), the workflow action method return value is
// ignored.
return "ignored";
}
When this method is called during an Activity Execution, the Activity Execution does not complete when its method returns.
Sometimes Workflows need to perform certain operations in parallel.
Invoking activity stub without the use of yield
will return the Activity result promise which can be resolved at later moment.
Calling yield
on promise blocks until a result is available.
Activity promise also exposes
then
method to construct promise chains. Read more about Promises here.
Alternatively you can explicitly wrap your code (including yield
constucts) using Workflow::async
which will execute nested code in parallel with main Workflow code.
Call yeild
on Promise returned by Workflow::async
to merge execution result back to primary Workflow method.
public function greet(string $name): \Generator
{
// Workflow::async runs it's activities and child workflows in a separate coroutine. Use keyword yield to merge
// it back to parent process.
$first = Workflow::async(
function () use ($name) {
$hello = yield $this->greetingActivity->composeGreeting('Hello', $name);
$bye = yield $this->greetingActivity->composeGreeting('Bye', $name);
return $hello . '; ' . $bye;
}
);
$second = Workflow::async(
function () use ($name) {
$hello = yield $this->greetingActivity->composeGreeting('Hola', $name);
$bye = yield $this->greetingActivity->composeGreeting('Chao', $name);
return $hello . '; ' . $bye;
}
);
// blocks until $first and $second complete
return (yield $first) . "\n" . (yield $second);
}
Async completion
There are certain scenarios when moving on from an Activity upon completion of its function is not possible or desirable. For example, you might have an application that requires user input to complete the Activity. You could implement the Activity with a polling mechanism, but a simpler and less resource-intensive implementation is to asynchronously complete a Temporal Activity.
There are two parts to implementing an asynchronously completed Activity:
- The Activity provides the information necessary for completion from an external system and notifies the Temporal service that it is waiting for that outside callback.
- The external service calls the Temporal service to complete the Activity.
The following example demonstrates the first part:
app/src/AsyncActivityCompletion/GreetingActivity.php
class GreetingActivity implements GreetingActivityInterface
{
private LoggerInterface $logger;
public function __construct()
{
$this->logger = new Logger();
}
/**
* Demonstrates how to implement an Activity asynchronously.
* When {@link Activity::doNotCompleteOnReturn()} is called,
* the Activity implementation function that returns doesn't complete the Activity.
*/
public function composeGreeting(string $greeting, string $name): string
{
// In real life this request can be executed anywhere. By a separate service for example.
$this->logger->info(sprintf('GreetingActivity token: %s', base64_encode(Activity::getInfo()->taskToken)));
// Send the taskToken to the external service that will complete the Activity.
// Return from the Activity a function indicating that Temporal should wait
// for an async completion message.
Activity::doNotCompleteOnReturn();
// When doNotCompleteOnReturn() is invoked the return value is ignored.
return 'ignored';
}
}
The following code demonstrates how to complete the Activity successfully using WorkflowClient
:
app/src/AsyncActivityCompletion/CompleteCommand.php
$client = $this->workflowClient->newActivityCompletionClient();
// Complete the Activity.
$client->completeByToken(
base64_decode($input->getArgument('token')),
$input->getArgument('message')
);
To fail the Activity, you would do the following:
// Fail the Activity.
$activityClient->completeExceptionallyByToken($taskToken, new \Error("activity failed"));
To mark an Activity as completing asynchoronus, do the following inside the Activity.
# Capture token for later completion
captured_token = activity.info().task_token
activity.raise_complete_async()
To update an Activity outside the Activity, use the get_async_activity_handle() method to get the handle of the Activity.
handle = my_client.get_async_activity_handle(task_token=captured_token)
Then, on that handle, you can call the results of the Activity, heartbeat
, complete
, fail
, or report_cancellation
method to update the Activity.
await handle.complete("Completion value.")
To asynchronously complete an Activity, call AsyncCompletionClient.complete
.
activities-examples/src/activities/async-completion.ts
- TypeScript
- JavaScript
import { CompleteAsyncError, Context } from '@temporalio/activity';
import { AsyncCompletionClient } from '@temporalio/client';
export async function doSomethingAsync(): Promise<string> {
const taskToken = Context.current().info.taskToken;
setTimeout(() => doSomeWork(taskToken), 1000);
throw new CompleteAsyncError();
}
// this work could be done in a different process or on a different machine
async function doSomeWork(taskToken: Uint8Array): Promise<void> {
const client = new AsyncCompletionClient();
// does some work...
await client.complete(taskToken, 'Job\'s done!');
}
import { CompleteAsyncError, Context } from '@temporalio/activity';
import { AsyncCompletionClient } from '@temporalio/client';
export async function doSomethingAsync() {
const taskToken = Context.current().info.taskToken;
setTimeout(() => doSomeWork(taskToken), 1000);
throw new CompleteAsyncError();
}
// this work could be done in a different process or on a different machine
async function doSomeWork(taskToken) {
const client = new AsyncCompletionClient();
// does some work...
await client.complete(taskToken, 'Job\'s done!');
}
Cancel an Activity
Canceling an Activity from within a Workflow requires that the Activity Execution sends Heartbeats and sets a Heartbeat Timeout.
If the Heartbeat is not invoked, the Activity cannot receive a cancellation request.
When any non-immediate Activity is executed, the Activity Execution should send Heartbeats and set a Heartbeat TimeoutWhat is a Heartbeat Timeout?
A Heartbeat Timeout is the maximum time between Activity Heartbeats.
Learn more to ensure that the server knows it is still working.
When an Activity is canceled, an error is raised in the Activity at the next available opportunity.
If cleanup logic needs to be performed, it can be done in a finally
clause or inside a caught cancel error.
However, for the Activity to appear canceled the exception needs to be re-raised.
Unlike regular Activities, Local ActivitiesWhat is a Local Activity?
A Local Activity is an Activity Execution that executes in the same process as the Workflow Execution that spawns it.
Learn more can be canceled if they don't send Heartbeats.
Local Activities are handled locally, and all the information needed to handle the cancellation logic is available in the same Worker process.
- Go
- Java
- PHP
- Python
- TypeScript
Content is planned but not yet available.
The information you are looking for may be found in the legacy docs.
Content is planned but not yet available.
The information you are looking for may be found in the legacy docs.
Content is planned but not yet available.
The information you are looking for may be found in the legacy docs.
To cancel an Activity from a Workflow Execution, call the cancel() method on the Activity handle that is returned from start_activity().
@activity.defn
async def cancellable_activity(input: ComposeArgsInput) -> NoReturn:
try:
while True:
print("Heartbeating cancel activity")
await asyncio.sleep(0.5)
activity.heartbeat("some details")
except asyncio.CancelledError:
print("Activity cancelled")
raise
@activity.defn
async def run_activity(input: ComposeArgsInput):
print("Executing activity")
return input.arg1 + input.arg2
@workflow.defn
class GreetingWorkflow:
@workflow.run
async def run(self, input: ComposeArgsInput) -> None:
activity_handle = workflow.start_activity(
cancel_activity,
ComposeArgsInput(input.arg1, input.arg2),
start_to_close_timeout=timedelta(minutes=5),
heartbeat_timeout=timedelta(seconds=30),
)
await asyncio.sleep(3)
activity_handle.cancel()
The Activity handle is a Python task.
By calling cancel()
, you're essentially requesting the task to be canceled.
Content is planned but not yet available.
The information you are looking for may be found in the legacy docs.
Child Workflows
A Child Workflow ExecutionWhat is a Child Workflow Execution?
A Child Workflow Execution is a Workflow Execution that is spawned from within another Workflow.
Learn more is a Workflow Execution that is scheduled from within another Workflow using a Child Workflow API.
When using a Child Workflow API, Child Workflow related Events (StartChildWorkflowExecutionInitiated, ChildWorkflowExecutionStarted, ChildWorkflowExecutionCompleted, etc...) are logged in the Workflow Execution Event History.
Always block progress until the ChildWorkflowExecutionStarted Event is logged to the Event History to ensure the Child Workflow Execution has started.
After that, Child Workflow Executions may be abandoned using the default Abandon Parent Close PolicyWhat is a Parent Close Policy?
If a Workflow Execution is a Child Workflow Execution, a Parent Close Policy determines what happens to the Workflow Execution if its Parent Workflow Execution changes to a Closed status (Completed, Failed, Timed out).
Learn more set in the Child Workflow Options.
To be sure that the Child Workflow Execution has started, first call the Child Workflow Execution method on the instance of Child Workflow future, which returns a different future.
Then get the value of an object that acts as a proxy for a result that is initially unknown, which is what waits until the Child Workflow Execution has spawned.
- Go
- Java
- PHP
- Python
- TypeScript
To spawn a Child Workflow ExecutionWhat is a Child Workflow Execution?
A Child Workflow Execution is a Workflow Execution that is spawned from within another Workflow.
Learn more in Go, use the ExecuteChildWorkflow
API, which is available from the go.temporal.io/sdk/workflow
package.
The ExecuteChildWorkflow
call requires an instance of workflow.Context
, with an instance of workflow.ChildWorkflowOptions
applied to it, the Workflow Type, and any parameters that should be passed to the Child Workflow Execution.
workflow.ChildWorkflowOptions
contain the same fields as client.StartWorkflowOptions
.
Workflow Option fields automatically inherit their values from the Parent Workflow Options if they are not explicitly set.
If a custom WorkflowID
is not set, one is generated when the Child Workflow Execution is spawned.
Use the WithChildOptions
API to apply Child Workflow Options to the instance of workflow.Context
.
The ExecuteChildWorkflow
call returns an instance of a ChildWorkflowFuture
.
Call the .Get()
method on the instance of ChildWorkflowFuture
to wait for the result.
func YourWorkflowDefinition(ctx workflow.Context, params ParentParams) (ParentResp, error) {
childWorkflowOptions := workflow.ChildWorkflowOptions{}
ctx = workflow.WithChildOptions(ctx, childWorkflowOptions)
var result ChildResp
err := workflow.ExecuteChildWorkflow(ctx, YourOtherWorkflowDefinition, ChildParams{}).Get(ctx, &result)
if err != nil {
// ...
}
// ...
return resp, nil
}
func YourOtherWorkflowDefinition(ctx workflow.Context, params ChildParams) (ChildResp, error) {
// ...
return resp, nil
}
To asynchronously spawn a Child Workflow Execution, the Child Workflow must have an "Abandon" Parent Close Policy set in the Child Workflow Options.
Additionally, the Parent Workflow Execution must wait for the ChildWorkflowExecutionStarted
Event to appear in its Event History before it completes.
If the Parent makes the ExecuteChildWorkflow
call and then immediately completes, the Child Workflow Execution does not spawn.
To be sure that the Child Workflow Execution has started, first call the GetChildWorkflowExecution
method on the instance of the ChildWorkflowFuture
, which will return a different Future.
Then call the Get()
method on that Future, which is what will wait until the Child Workflow Execution has spawned.
import (
// ...
"go.temporal.io/api/enums/v1"
)
func YourWorkflowDefinition(ctx workflow.Context, params ParentParams) (ParentResp, error) {
childWorkflowOptions := workflow.ChildWorkflowOptions{
ParentClosePolicy: enums.PARENT_CLOSE_POLICY_ABANDON,
}
ctx = workflow.WithChildOptions(ctx, childWorkflowOptions)
childWorkflowFuture := workflow.ExecuteChildWorkflow(ctx, YourOtherWorkflowDefinition, ChildParams{})
// Wait for the Child Workflow Execution to spawn
var childWE workflow.Execution
if err := childWorkflowFuture.GetChildWorkflowExecution().Get(ctx, &childWE); err != nil {
return err
}
// ...
return resp, nil
}
func YourOtherWorkflowDefinition(ctx workflow.Context, params ChildParams) (ChildResp, error) {
// ...
return resp, nil
}
The first call to the Child Workflow stub must always be its Workflow method (method annotated with @WorkflowMethod
).
Similar to Activities, invoking Child Workflow methods can be made synchronous or asynchronous by using Async#function
or Async#procedure
.
The synchronous call blocks until a Child Workflow method completes.
The asynchronous call returns a Promise
which can be used to wait for the completion of the Child Workflow method, as in the following example:
GreetingChild child = Workflow.newChildWorkflowStub(GreetingChild.class);
Promise<String> greeting = Async.function(child::composeGreeting, "Hello", name);
// ...
greeting.get()
To execute an untyped Child Workflow asynchronously, call executeAsync
on the ChildWorkflowStub
, as shown in the following example.
//...
ChildWorkflowStub childUntyped =
Workflow.newUntypedChildWorkflowStub(
"GreetingChild", // your workflow type
ChildWorkflowOptions.newBuilder().setWorkflowId("childWorkflow").build());
Promise<String> greeting =
childUntyped.executeAsync(String.class, String.class, "Hello", name);
String result = greeting.get();
//...
The following examples show how to spawn a Child Workflow:
Spawn a Child Workflow from a Workflow:
// Child Workflow interface
@WorkflowInterface
public interface GreetingChild {
@WorkflowMethod
String composeGreeting(String greeting, String name);
}
// Child Workflow implementation not shown
// Parent Workflow implementation
public class GreetingWorkflowImpl implements GreetingWorkflow {
@Override
public String getGreeting(String name) {
GreetingChild child = Workflow.newChildWorkflowStub(GreetingChild.class);
// This is a blocking call that returns only after child has completed.
return child.composeGreeting("Hello", name );
}
}Spawn two Child Workflows (with the same type) in parallel:
// Parent Workflow implementation
public class GreetingWorkflowImpl implements GreetingWorkflow {
@Override
public String getGreeting(String name) {
// Workflows are stateful, so a new stub must be created for each new child.
GreetingChild child1 = Workflow.newChildWorkflowStub(GreetingChild.class);
Promise<String> greeting1 = Async.function(child1::composeGreeting, "Hello", name);
// Both children will run concurrently.
GreetingChild child2 = Workflow.newChildWorkflowStub(GreetingChild.class);
Promise<String> greeting2 = Async.function(child2::composeGreeting, "Bye", name);
// Do something else here.
...
return "First: " + greeting1.get() + ", second: " + greeting2.get();
}
}Send a Signal to a Child Workflow from the parent:
// Child Workflow interface
@WorkflowInterface
public interface GreetingChild {
@WorkflowMethod
String composeGreeting(String greeting, String name);
@SignalMethod
void updateName(String name);
}
// Parent Workflow implementation
public class GreetingWorkflowImpl implements GreetingWorkflow {
@Override
public String getGreeting(String name) {
GreetingChild child = Workflow.newChildWorkflowStub(GreetingChild.class);
Promise<String> greeting = Async.function(child::composeGreeting, "Hello", name);
child.updateName("Temporal");
return greeting.get();
}
}Sending a Query to Child Workflows from within the parent Workflow code is not supported. However, you can send a Query to Child Workflows from Activities using
WorkflowClient
.
Related reads:
- How to develop a Workflow DefinitionHow to develop a Workflow Definition in Java
In the Temporal Java SDK programming model, a Workflow is a class which implements a Workflow interface.
Learn more Java Workflow reference: https://www.javadoc.io/doc/io.temporal/temporal-sdk/latest/io/temporal/workflow/package-summary.html
Besides Activities, a Workflow can also start other Workflows.
Workflow::executeChildWorkflow
and Workflow::newChildWorkflowStub
enables the scheduling of other Workflows from within a Workflow's implementation.
The parent Workflow has the ability to monitor and impact the lifecycle of the Child Workflow, similar to the way it does for an Activity that it invoked.
// Use one stub per child workflow run
$child = Workflow::newChildWorkflowStub(
ChildWorkflowInterface::class,
ChildWorkflowOptions::new()
// Do not specify WorkflowId if you want Temporal to generate a unique Id
// for the child execution.
->withWorkflowId('BID-SIMPLE-CHILD-WORKFLOW')
->withExecutionStartToCloseTimeout(DateInterval::createFromDateString('30 minutes'))
);
// This is a non blocking call that returns immediately.
// Use yield $child->workflowMethod(name) to call synchronously.
$promise = $child->workflowMethod('value');
// Do something else here.
try{
$value = yield $promise;
} catch(TemporalException $e) {
$logger->error('child workflow failed');
throw $e;
}
Let's take a look at each component of this call.
Before calling $child->workflowMethod()
, you must configure ChildWorkflowOptions
for the invocation.
These options customize various execution timeouts, and are passed into the Workflow stub defined by the Workflow::newChildWorkflowStub
.
Once stub created you can invoke its Workflow method based on attribute WorkflowMethod
.
The method call returns immediately and returns a Promise
.
This allows you to execute more code without having to wait for the scheduled Workflow to complete.
When you are ready to process the results of the Workflow, call the yield $promise
method on the returned promise object.
When a parent Workflow is cancelled by the user, the Child Workflow can be cancelled or abandoned based on a configurable child policy.
You can also skip the stub part of Child Workflow initiation and use Workflow::executeChildWorkflow
directly:
// Use one stub per child workflow run
$childResult = yield Workflow::executeChildWorkflow(
'ChildWorkflowName',
['args'],
ChildWorkflowOptions::new()->withWorkflowId('BID-SIMPLE-CHILD-WORKFLOW'),
Type::TYPE_STRING // optional: defines the return type
);
To spawn a Child Workflow Execution in Python, use the execute_child_workflow()
function. execute_child_workflow()
starts the Child Workflow and waits for completion.
await workflow.execute_child_workflow(MyWorkflow.run, "my child arg", id="my-child-id")
Alternatively, use the start_child_workflow()
function to start a Child Workflow and return its handle. This is useful if you want to do something after it has only started, or to get the workflow/run ID, or to be able to signal it while running. To wait for completion, simply await
the handle. execute_child_workflow()
is a helper function for start_child_workflow()
+ await handle
.
await workflow.start_child_workflow(MyWorkflow.run, "my child arg", id="my-child-id")
To start a Child Workflow and return a handle to it, use startChild
.
To start a Child Workflow Execution and await its completion, use executeChild
.
By default, a child is scheduled on the same Task Queue as the parent.
child-workflows/src/workflows.ts
- TypeScript
- JavaScript
import { executeChild } from '@temporalio/workflow';
export async function parentWorkflow(...names: string[]): Promise<string> {
const responseArray = await Promise.all(
names.map((name) =>
executeChild(childWorkflow, {
args: [name],
// workflowId, // add business-meaningful workflow id here
// // regular workflow options apply here, with two additions (defaults shown):
// cancellationType: ChildWorkflowCancellationType.WAIT_CANCELLATION_COMPLETED,
// parentClosePolicy: ParentClosePolicy.PARENT_CLOSE_POLICY_TERMINATE
})
),
);
return responseArray.join('\n');
}
import { executeChild } from '@temporalio/workflow';
export async function parentWorkflow(...names) {
const responseArray = await Promise.all(names.map((name) => executeChild(childWorkflow, {
args: [name],
// workflowId, // add business-meaningful workflow id here
// // regular workflow options apply here, with two additions (defaults shown):
// cancellationType: ChildWorkflowCancellationType.WAIT_CANCELLATION_COMPLETED,
// parentClosePolicy: ParentClosePolicy.PARENT_CLOSE_POLICY_TERMINATE
})));
return responseArray.join('\n');
}
Parent Close Policy
A Parent Close PolicyWhat is a Parent Close Policy?
If a Workflow Execution is a Child Workflow Execution, a Parent Close Policy determines what happens to the Workflow Execution if its Parent Workflow Execution changes to a Closed status (Completed, Failed, Timed out).
Learn more determines what happens to a Child Workflow Execution if its Parent changes to a Closed status (Completed, Failed, or Timed Out).
The default Parent Close Policy option is set to terminate the Child Workflow Execution.
- Go
- Java
- PHP
- Python
- TypeScript
In Go, a Parent Close Policy is set on the ParentClosePolicy
field of an instance of workflow.ChildWorkflowOptions
.
The possible values can be obtained from the go.temporal.io/api/enums/v1
package.
PARENT_CLOSE_POLICY_ABANDON
PARENT_CLOSE_POLICY_TERMINATE
PARENT_CLOSE_POLICY_REQUEST_CANCEL
The Child Workflow Options are then applied to the instance of workflow.Context
by using the WithChildOptions
API, which is then passed to the ExecuteChildWorkflow()
call.
- Type:
ParentClosePolicy
- Default:
PARENT_CLOSE_POLICY_TERMINATE
import (
// ...
"go.temporal.io/api/enums/v1"
)
func YourWorkflowDefinition(ctx workflow.Context, params ParentParams) (ParentResp, error) {
// ...
childWorkflowOptions := workflow.ChildWorkflowOptions{
// ...
ParentClosePolicy: enums.PARENT_CLOSE_POLICY_ABANDON,
}
ctx = workflow.WithChildOptions(ctx, childWorkflowOptions)
childWorkflowFuture := workflow.ExecuteChildWorkflow(ctx, YourOtherWorkflowDefinition, ChildParams{})
// ...
}
func YourOtherWorkflowDefinition(ctx workflow.Context, params ChildParams) (ChildResp, error) {
// ...
return resp, nil
}
Set Parent Close PolicyWhat is a Parent Close Policy?
If a Workflow Execution is a Child Workflow Execution, a Parent Close Policy determines what happens to the Workflow Execution if its Parent Workflow Execution changes to a Closed status (Completed, Failed, Timed out).
Learn more on an instance of ChildWorkflowOptions
using ChildWorkflowOptions.newBuilder().setParentClosePolicy
.
- Type:
ChildWorkflowOptions.Builder
- Default:
PARENT_CLOSE_POLICY_TERMINATE
public void parentWorkflow() {
ChildWorkflowOptions options =
ChildWorkflowOptions.newBuilder()
.setParentClosePolicy(ParentClosePolicy.PARENT_CLOSE_POLICY_ABANDON)
.build();
MyChildWorkflow child = Workflow.newChildWorkflowStub(MyChildWorkflow.class, options);
Async.procedure(child::<workflowMethod>, <args>...);
Promise<WorkflowExecution> childExecution = Workflow.getWorkflowExecution(child);
// Wait for child to start
childExecution.get()
}
In this example, we are:
- Setting
ChildWorkflowOptions.ParentClosePolicy
toABANDON
when creating a Child Workflow stub. - Starting Child Workflow Execution asynchronously using
Async.function
orAsync.procedure
. - Calling
Workflow.getWorkflowExecution(…)
on the child stub. - Waiting for the
Promise
returned bygetWorkflowExecution
to complete. This indicates whether the Child Workflow started successfully (or failed). - Completing parent Workflow Execution asynchronously.
Steps 3 and 4 are needed to ensure that a Child Workflow Execution starts before the parent closes. If the parent initiates a Child Workflow Execution and then completes immediately after, the Child Workflow will never execute.
In PHP, a Parent Close PolicyWhat is a Parent Close Policy?
If a Workflow Execution is a Child Workflow Execution, a Parent Close Policy determines what happens to the Workflow Execution if its Parent Workflow Execution changes to a Closed status (Completed, Failed, Timed out).
Learn more is set via the ChildWorkflowOptions
object and withParentClosePolicy()
method.
The possible values can be obtained from the ParentClosePolicy
class.
POLICY_TERMINATE
POLICY_ABANDON
POLICY_REQUEST_CANCEL
Then ChildWorkflowOptions
object is used to create a new Child Workflow object:
$child = Workflow::newUntypedChildWorkflowStub(
'child-workflow',
ChildWorkflowOptions::new()
->withParentClosePolicy(ParentClosePolicy::POLICY_ABANDON)
);
yield $child->start();
In the snippet above we:
- Create a new untyped Child Workflow stub with
Workflow::newUntypedChildWorkflowStub
. - Provide
ChildWorkflowOptions
object with Parent Close Policy set toParentClosePolicy::POLICY_ABANDON
. - Start Child Workflow Execution asynchronously using
yield
and methodstart()
.
We need yield
here to ensure that a Child Workflow Execution starts before the parent closes.
Set the parent_close_policy
parameter inside the start_child_workflow
function or the execute_child_workflow()
function to specify the behavior of the Child Workflow when the Parent Workflow closes.
async def run(self, name: str) -> str:
return await workflow.execute_child_workflow(
ComposeGreeting.run,
ComposeGreetingInput("Hello", name),
id="hello-child-workflow-workflow-child-id",
parent_close_policy=TERMINATE,
)
execute_child_workflow()
is a shortcut function for temporalio.workflow.start_child_workflow()
plus handle.result()
.
To specify how a Child Workflow reacts to a Parent Workflow reaching a Closed state, use the parentClosePolicy
option.
child-workflows/src/workflows.ts
- TypeScript
- JavaScript
import { executeChild } from '@temporalio/workflow';
export async function parentWorkflow(...names: string[]): Promise<string> {
const responseArray = await Promise.all(
names.map((name) =>
executeChild(childWorkflow, {
args: [name],
// workflowId, // add business-meaningful workflow id here
// // regular workflow options apply here, with two additions (defaults shown):
// cancellationType: ChildWorkflowCancellationType.WAIT_CANCELLATION_COMPLETED,
// parentClosePolicy: ParentClosePolicy.PARENT_CLOSE_POLICY_TERMINATE
})
),
);
return responseArray.join('\n');
}
import { executeChild } from '@temporalio/workflow';
export async function parentWorkflow(...names) {
const responseArray = await Promise.all(names.map((name) => executeChild(childWorkflow, {
args: [name],
// workflowId, // add business-meaningful workflow id here
// // regular workflow options apply here, with two additions (defaults shown):
// cancellationType: ChildWorkflowCancellationType.WAIT_CANCELLATION_COMPLETED,
// parentClosePolicy: ParentClosePolicy.PARENT_CLOSE_POLICY_TERMINATE
})));
return responseArray.join('\n');
}
Continue-As-New
Continue-As-NewWhat is Continue-As-New?Continue-As-New is the mechanism by which all relevant state is passed to a new Workflow Execution with a fresh Event History.
Learn more enables a Workflow Execution to close successfully and create a new Workflow Execution in a single atomic operation if the number of Events in the Event History is becoming too large. The Workflow Execution spawned from the use of Continue-As-New has the same Workflow Id, a new Run Id, and a fresh Event History and is passed all the appropriate parameters.
- Go
- Java
- PHP
- Python
- TypeScript
To cause a Workflow Execution to Continue-As-NewWhat is Continue-As-New?
Continue-As-New is the mechanism by which all relevant state is passed to a new Workflow Execution with a fresh Event History.
Learn more, the Workflow API should return the result of the NewContinueAsNewError()
function available from the go.temporal.io/sdk/workflow
package.
func SimpleWorkflow(ctx workflow.Context, value string) error {
...
return workflow.NewContinueAsNewError(ctx, SimpleWorkflow, value)
}
To check whether a Workflow Execution was spawned as a result of Continue-As-New, you can check if workflow.GetInfo(ctx).ContinuedExecutionRunID
is not empty (i.e. ""
).
Notes
- To prevent Signal loss, be sure to perform an asynchronous drain on the Signal channel. Failure to do so can result in buffered Signals being ignored and lost.
- Make sure that the previous Workflow and the Continue-As-New Workflow are referenced by the same alias. Failure to do so can cause the Workflow to Continue-As-New on an entirely different Workflow.
Temporal SDK allows you to use Continue-As-NewWhat is Continue-As-New?
Continue-As-New is the mechanism by which all relevant state is passed to a new Workflow Execution with a fresh Event History.
Learn more in various ways.
To continue execution of the same Workflow that is currently running, use:
Workflow.continueAsNew(input1, ...);
To continue execution of a currently running Workflow as a completely different Workflow Type, use Workflow.newContinueAsNewStub()
.
For example, in a Workflow class called YourWorkflow
, we can create a Workflow stub with a different type, and call its Workflow method to continue execution as that type:
MyOtherWorkflow continueAsNew = Workflow.newContinueAsNewStub(MyOtherWorkflow.class);
coninueAsNew.greet(input);
To provide ContinueAsNewOptions
options in Workflow.newContinueAsNewStub()
use:
ContinueAsNewOptions options = ContinueAsNewOptions.newBuilder()
.setTaskQueue("newTaskQueueName")
.build();
MyOtherWorkflow continueAsNew = Workflow.newContinueAsNewStub(MyOtherWorkflow.class, options);
// ...
continueAsNew.greet(input);
Providing these options allows you to continue Workflow Execution as a new Workflow run, with a different Workflow Type, and on a different Task Queue.
Java Workflow reference: https://www.javadoc.io/doc/io.temporal/temporal-sdk/latest/io/temporal/workflow/package-summary.html
Workflows that need to rerun periodically could naively be implemented as a big while loop with a sleep where the entire logic of the Workflow is inside the body of the while loop. The problem with this approach is that the history for that Workflow will keep growing to a point where it reaches the maximum size enforced by the service.
ContinueAsNew is the low level construct that enables implementing such Workflows without the risk of failures down the road. The operation atomically completes the current execution and starts a new execution of the Workflow with the same Workflow Id. The new execution will not carry over any history from the old execution.
To trigger this behavior, use Workflow::continueAsNew
or Workflow::newContinueAsNewStub
method:
#[Workflow\WorkflowMethod]
public function periodic(string $name, int $value = 0)
{
for ($i = 0; $i < 100; $i++) {
// do something
$value++;
}
// maintain $value counter between runs
return Workflow::newContinueAsNewStub(self::class)->periodic($name, $value);
}
To Continue-As-New in Python, call the continue_as_new()
function from inside your Workflow, which will stop the Workflow immediately and Continue-As-New.
workflow.continue_as_new("your-workflow-name")
To cause a Workflow Execution to Continue-As-NewWhat is Continue-As-New?
Continue-As-New is the mechanism by which all relevant state is passed to a new Workflow Execution with a fresh Event History.
Learn more, the Workflow function should return the result of the continueAsNew
.
continue-as-new/src/workflows.ts
- TypeScript
- JavaScript
import { continueAsNew, sleep } from '@temporalio/workflow';
export async function loopingWorkflow(iteration = 0): Promise<void> {
if (iteration === 10) {
return;
}
console.log('Running Workflow iteration:', iteration);
await sleep(1000);
// Must match the arguments expected by `loopingWorkflow`
await continueAsNew<typeof loopingWorkflow>(iteration + 1);
// Unreachable code, continueAsNew is like `process.exit` and will stop execution once called.
}
import { continueAsNew, sleep } from '@temporalio/workflow';
export async function loopingWorkflow(iteration = 0) {
if (iteration === 10) {
return;
}
console.log('Running Workflow iteration:', iteration);
await sleep(1000);
// Must match the arguments expected by `loopingWorkflow`
await continueAsNew(iteration + 1);
// Unreachable code, continueAsNew is like `process.exit` and will stop execution once called.
}
Timers
A Workflow can set a durable timer for a fixed time period.
In some SDKs, the function is called sleep()
, and in others, it's called timer()
.
A Workflow can sleep for months.
Timers are persisted, so even if your Worker or Temporal Cluster is down when the time period completes, as soon as your Worker and Cluster are back up, the sleep()
call will resolve and your code will continue executing.
Sleeping is a resource-light operation: it does not tie up the process, and you can run millions of Timers off a single Worker.
- Go
- Java
- PHP
- Python
- TypeScript
To set a Timer in Go, use the NewTimer()
function and pass the duration you want to wait before continuing.
timer := workflow.NewTimer(timerCtx, duration)
To set a sleep duration in Go, use the sleep()
function and pass the duration you want to wait before continuing.
A zero or negative sleep duration causes the function to return immediately.
sleep = workflow.Sleep(ctx, 10*time.Second)
For more information, see the Timer example in the Go Samples repository.
To set a Timer in Java, use sleep()
and pass the number of seconds you want to wait before continuing.
sleep(5);
To set a Timer in PHP, use Workflow::timer()
and pass the number of seconds you want to wait before continuing.
The following example yields a sleep method for 5 minutes.
yield Workflow::timer(300); // sleep for 5 minutes
You cannot set a Timer invocation inside the await
or awaitWithTimeout
methods.
To set a Timer in Python, call the asyncio.sleep()
function and pass the duration in seconds you want to wait before continuing.
await asyncio.sleep(5)
To set a Timer in TypeScript, use the sleep()
function and pass how long you want to wait before continuing (using an ms-formatted string or number of milliseconds).
import { sleep } from '@temporalio/workflow';
export async function sleepWorkflow(): Promise<void> {
await sleep('2 months');
console.log('done sleeping');
}
Temporal Cron Jobs
A Temporal Cron JobWhat is a Temporal Cron Job?
A Temporal Cron Job is the series of Workflow Executions that occur when a Cron Schedule is provided in the call to spawn a Workflow Execution.
Learn more is the series of Workflow Executions that occur when a Cron Schedule is provided in the call to spawn a Workflow Execution.
A Cron Schedule is provided as an option when the call to spawn a Workflow Execution is made.
- Go
- Java
- PHP
- Python
- TypeScript
Create an instance of StartWorkflowOptions
from the go.temporal.io/sdk/client
package, set the CronSchedule
field, and pass the instance to the ExecuteWorkflow
call.
- Type:
string
- Default: None
workflowOptions := client.StartWorkflowOptions{
CronSchedule: "15 8 * * *",
// ...
}
workflowRun, err := c.ExecuteWorkflow(context.Background(), workflowOptions, YourWorkflowDefinition)
if err != nil {
// ...
}
Set the Cron Schedule with the WorkflowStub
instance in the Client code using WorkflowOptions.Builder.setCronSchedule
.
Setting setCronSchedule
changes the Workflow Execution into a Temporal Cron Job.
The default timezone for a Cron is UTC.
- Type:
String
- Default: None
//create Workflow stub for YourWorkflowInterface
YourWorkflowInterface workflow1 =
YourWorker.yourclient.newWorkflowStub(
YourWorkflowInterface.class,
WorkflowOptions.newBuilder()
.setWorkflowId("YourWF")
.setTaskQueue(YourWorker.TASK_QUEUE)
// Set Cron Schedule
.setCronSchedule("* * * * *")
.build());
For more details, see the Cron Sample
Set your Cron Schedule with CronSchedule('* * * * *')
.
The following example sets a Cron Schedule in PHP:
$workflow = $this->workflowClient->newWorkflowStub(
CronWorkflowInterface::class,
WorkflowOptions::new()
->withWorkflowId(CronWorkflowInterface::WORKFLOW_ID)
->withCronSchedule('* * * * *')
// Execution timeout limits total time. Cron will stop executing after this timeout.
->withWorkflowExecutionTimeout(CarbonInterval::minutes(10))
// Run timeout limits duration of a single workflow invocation.
->withWorkflowRunTimeout(CarbonInterval::minute(1))
);
$output->writeln("Starting <comment>CronWorkflow</comment>... ");
try {
$run = $this->workflowClient->start($workflow, 'Antony');
// ...
}
Setting withCronSchedule
turns the Workflow Execution into a Temporal Cron Job.
For more information, see the PHP samples for example code or the PHP SDK WorkflowOptions
source code.
You can set each Workflow to repeat on a schedule with the cron_schedule
option from either the start_workflow()
or execute_workflow()
asynchronous methods:
await client.start_workflow(
"your_workflow_name",
id="your-workflow-id",
task_queue="your-task-queue",
cron_schedule="* * * * *",
)
You can set each Workflow to repeat on a schedule with the cronSchedule
option:
const handle = await client.start(scheduledWorkflow, {
// ...
cronSchedule: '* * * * *', // start every minute
});
Side Effects
Side Effects are used to execute non-deterministic code, such as generating a UUID or a random number, without compromising deterministic in the Workflow. This is done by storing the non-deterministic results of the Side Effect into the Workflow Event History.
A Side Effect does not re-execute during a Replay. Instead, it returns the recorded result from the Workflow Execution Event History.
Side Effects should not fail. An exception that is thrown from the Side Effect causes failure and retry of the current Workflow Task.
An Activity or a Local Activity may also be used instead of a Side effect, as its result is also persisted in Workflow Execution History.
You shouldn’t modify the Workflow state inside a Side Effect function, because it is not reexecuted during Replay. Side Effect function should be used to return a value.
- Go
- Java
- PHP
- Python
- TypeScript
Use the SideEffect
function from the go.temporal.io/sdk/workflow
package to execute a Side EffectWhat is a Side Effect?
A Side Effect is a way to execute a short, non-deterministic code snippet, such as generating a UUID, that executes the provided function once and records its result into the Workflow Execution Event History.
Learn more directly in your Workflow.
Pass it an instance of context.Context
and the function to execute.
The SideEffect
API returns a Future, an instance of converter.EncodedValue
.
Use the Get
method on the Future to retrieve the result of the Side Effect.
Correct implementation
The following example demonstrates the correct way to use SideEffect
:
encodedRandom := workflow.SideEffect(ctx, func(ctx workflow.Context) interface{} {
return rand.Intn(100)
})
var random int
encodedRandom.Get(&random)
// ...
}
Incorrect implementation
The following example demonstrates how NOT to use SideEffect
:
// Warning: This is an incorrect example.
// This code is non-deterministic.
var random int
workflow.SideEffect(func(ctx workflow.Context) interface{} {
random = rand.Intn(100)
return nil
})
// random will always be 0 in replay, so this code is non-deterministic.
On replay the provided function is not executed, the random number will always be 0, and the Workflow Execution could take a different path, breaking determinism.
To use a Side Effect in Java, set the sideEffect()
function in your Workflow Execution and return the non-deterministic code.
int random = Workflow.sideEffect(Integer.class, () -> random.nextInt(100));
if random < 50 {
....
} else {
....
}
Here's another example that uses sideEffect()
.
// implementation of the @WorkflowMethod
public void execute() {
int randomInt = Workflow.sideEffect( int.class, () -> {
Random random = new SecureRandom();
return random.nextInt();
});
String userHome = Workflow.sideEffect(String.class, () -> System.getenv("USER_HOME"));
if(randomInt % 2 == 0) {
// ...
} else {
// ...
}
}
Java also provides a deterministic method to generate random numbers or random UUIDs.
To generate random numbers in a deterministic method, use newRandom()
// implementation of the @WorkflowMethod
public void execute() {
int randomInt = Workflow.newRandom().nextInt();
// ...
}
To generate a random UUID in a deterministic method, use randomUUID()
.
// implementation of the @WorkflowMethod
public void execute() {
String randomUUID = Workflow.randomUUID().toString();
// ...
}
Content is planned but not yet available.
The information you are looking for may be found in the legacy docs.
Not applicable to this SDK.
Not applicable to this SDK.
Mutable Side Effects
Mutable Side Effects execute the provided function once, and then it looks up the History of the value with the given Workflow ID.
- If there is no existing value, then it records the function result as a value with the given Workflow Id on the History.
- If there is an existing value, then it compares whether the existing value from the History has changed from the new function results, by calling the equals function.
- If the values are equal, then it returns the value without recording a new Marker Event
- If the values aren't equal, then it records the new value with the same ID on the History.
During a Workflow Execution, every new Side Effect call results in a new Marker recorded on the Workflow History; whereas Mutable Side Effects only records a new Marker on the Workflow History if the value for the Side Effect ID changes or is set the first time.
During a Replay, Mutable Side Effects will not execute the function again. Instead, it returns the exact same value that was returned during the Workflow Execution.
- Go
- Java
- PHP
- Python
- TypeScript
To use MutableSideEffect()
in Go, provide a unique name within the scope of the workflow.
if err := workflow.MutableSideEffect(ctx, "configureNumber", get, eq).Get(&number); err != nil {
panic("can't decode number:" + err.Error())
}
Content is planned but not yet available.
The information you are looking for may be found in the legacy docs.
Content is planned but not yet available.
The information you are looking for may be found in the legacy docs.
Not applicable to this SDK.
Not applicable to this SDK.
Namespaces
You can create, update, deprecate or delete your NamespacesWhat is a Namespace?
A Namespace is a unit of isolation within the Temporal Platform
Learn more using either tctl or SDK APIs.
Use Namespaces to isolate your Workflow Executions according to your needs.
For example, you can use Namespaces to match the development lifecycle by having separate dev
and prod
Namespaces.
You could also use them to ensure Workflow Executions between different teams never communicate - such as ensuring that the teamA
Namespace never impacts the teamB
Namespace.
On Temporal Cloud, use the Temporal Cloud UIHow to create a Namespace in Temporal Cloud
To create a Namespace in Temporal Cloud, use either Temporal Cloud UI or tcld.
Learn more to create and manage a Namespace from the UI, or tcld commands to manage Namespaces from the command-line interface.
On self-hosted Temporal Cluster, you can register and manage your Namespaces using tctl (recommended) or programmatically using APIs. Note that these APIs and tctl commands will not work with Temporal Cloud.
Use a custom AuthorizerWhat is an Authorizer Plugin?
undefined
Learn more on your Frontend Service in the Temporal Cluster to set restrictions on who can create, update, or deprecate Namespaces.
You must register a Namespace with the Temporal Cluster before setting it in the Temporal Client.
Register Namespace
Registering a Namespace creates a Namespace on the Temporal Cluster or Temporal Cloud.
On Temporal Cloud, use the Temporal Cloud UIHow to create a Namespace in Temporal Cloud
To create a Namespace in Temporal Cloud, use either Temporal Cloud UI or tcld.
Learn more or tcld commands to create Namespaces.
On self-hosted Temporal Cluster, you can register your Namespaces using tctl (recommended) or programmatically using APIs. Note that these APIs and tctl commands will not work with Temporal Cloud.
Use a custom AuthorizerWhat is an Authorizer Plugin?
undefined
Learn more on your Frontend Service in the Temporal Cluster to set restrictions on who can create, update, or deprecate Namespaces.
- Go
- Java
- PHP
- Python
- TypeScript
Use Register
API with the NamespaceClient
interface to register a NamespaceWhat is a Namespace?
A Namespace is a unit of isolation within the Temporal Platform
Learn more and set the Retention PeriodWhat is a Retention Period?
A Retention Period is the amount of time a Workflow Execution Event History remains in the Cluster's persistence store.
Learn more for the Workflow Execution Event History for the Namespace.
You can also register Namespaces using the tctl command-line tooltctl namespace register
How to register a Namespace using tctl.
Learn more.
client, err := client.NewNamespaceClient(client.Options{HostPort: ts.config.ServiceAddr})
//...
err = client.Register(ctx, &workflowservice.RegisterNamespaceRequest{
Namespace: your-namespace-name,
WorkflowExecutionRetentionPeriod: &retention,
})
The Retention Period setting using WorkflowExecutionRetentionPeriod
is mandatory.
The minimum value you can set for this period is 1 day.
Once registered, set Namespace using Dial
in a Workflow Client to run your Workflow Executions within that Namespace.
See how to set Namespace in a Client in GoHow to connect a Temporal Client to a Temporal Cluster
When connecting a Temporal Client to a Temporal Cluster, you must provide the address and port number of the Temporal Cluster.
Learn more for details.
Note that Namespace registration using this API takes up to 10 seconds to complete. Ensure that you wait for this registration to complete before starting the Workflow Execution against the Namespace.
To update your Namespace, use the Update
API with the NamespaceClient
.
To update your Namespace using tctl, use the tctl namespace updatetctl namespace update
How to update a Namespace using tctl.
Learn more command.
Use the RegisterNamespace
API to register a NamespaceWhat is a Namespace?
A Namespace is a unit of isolation within the Temporal Platform
Learn more and set the Retention PeriodWhat is a Retention Period?
A Retention Period is the amount of time a Workflow Execution Event History remains in the Cluster's persistence store.
Learn more for the Workflow Execution Event History for the Namespace.
//...
import com.google.protobuf.util.Durations;
import io.temporal.api.workflowservice.v1.RegisterNamespaceRequest;
//...
public static void createNamespace(String name) {
RegisterNamespaceRequest req = RegisterNamespaceRequest.newBuilder()
.setNamespace("your-custom-namespace")
.setWorkflowExecutionRetentionPeriod(Durations.fromDays(3)) // keeps the Workflow Execution
//Event History for up to 3 days in the Persistence store. Not setting this value will throw an error.
.build();
service.blockingStub().registerNamespace(req);
}
//...
The Retention Period setting using WorkflowExecutionRetentionPeriod
is mandatory.
The minimum value you can set for this period is 1 day.
Once registered, set Namespace using WorkflowClientOptions
within a Workflow Client to run your Workflow Executions within that Namespace.
See how to set Namespace in a Client in Java for details.
Note that Namespace registration using this API takes up to 10 seconds to complete. Ensure that you wait for this registration to complete before starting the Workflow Execution against the Namespace.
To update your Namespace, use the UpdateNamespace
API with the NamespaceClient
.
Content is planned but not yet available.
The information you are looking for may be found in the legacy docs.
Content is planned but not yet available.
The information you are looking for may be found in the legacy docs.
Content is planned but not yet available.
The information you are looking for may be found in the legacy docs.
Manage Namespaces
You can get details for your Namespaces, update Namespace configuration, and deprecate or delete your Namespaces.
On Temporal Cloud, use the Temporal Cloud UIHow to create a Namespace in Temporal Cloud
To create a Namespace in Temporal Cloud, use either Temporal Cloud UI or tcld.
Learn more or tcld commands to manage Namespaces.
On self-hosted Temporal Cluster, you can manage your registered Namespaces using tctl (recommended) or programmatically using APIs. Note that these APIs and tctl commands will not work with Temporal Cloud.
Use a custom AuthorizerWhat is an Authorizer Plugin?
undefined
Learn more on your Frontend Service in the Temporal Cluster to set restrictions on who can create, update, or deprecate Namespaces.
You must register a Namespace with the Temporal Cluster before setting it in the Temporal Client.
- Go
- Java
- PHP
- Python
- TypeScript
On Temporal Cloud, use the Temporal Cloud UI or tcld commands to manage Namespaces.
On self-hosted Temporal Cluster, you can manage your registered Namespaces using tctl (recommended) or programmatically using APIs. Note that these APIs and tctl commands will not work with Temporal Cloud.
Update information and configuration for a registered Namespace on your Temporal Cluster:
With tctl:
tctl namespace update
ExampleUse the
UpdateNamespace
API to update configuration on a Namespace. Example//...
err = client.Update(context.Background(), &workflowservice.UpdateNamespaceRequest{
Namespace: "your-namespace-name",
UpdateInfo: &namespace.UpdateNamespaceInfo{ //updates info for the namespace "your-namespace-name"
Description: "updated namespace description",
OwnerEmail: "newowner@mail.com",
//Data: nil,
//State: 0,
},
/*other details that you can update:
Config: &namespace.NamespaceConfig{ //updates the configuration of the namespace with the following options
//WorkflowExecutionRetentionTtl: nil,
//BadBinaries: nil,
//HistoryArchivalState: 0,
//HistoryArchivalUri: "",
//VisibilityArchivalState: 0,
//VisibilityArchivalUri: "",
},
ReplicationConfig: &replication.NamespaceReplicationConfig{ //updates the replication configuration for the namespace
//ActiveClusterName: "",
//Clusters: nil,
//State: 0,
},
SecurityToken: "",
DeleteBadBinary: "",
PromoteNamespace: false,
})*/
//...
Get details for a registered Namespace on your Temporal Cluster:
With tctl:
tctl namespace describe
Use the
DescribeNamespace
API to return information and configuration details for a registered Namespace. Example//...
client, err := client.NewNamespaceClient(client.Options{})
//...
client.Describe(context.Background(), "default")
//...
Get details for all registered Namespaces on your Temporal Cluster:
- With tctl:
tctl namespace list
- Use the
ListNamespace
API to return information and configuration details for all registered Namespaces on your Temporal Cluster. Example
//...
namespace.Handler.ListNamespaces(context.Context(), &workflowservice.ListNamespacesRequest{ //lists 1 page (1-100) of namespaces on the active cluster. You can set a large PageSize or loop until NextPageToken is nil
//PageSize: 0,
//NextPageToken: nil,
//NamespaceFilter: nil,
})
//...- With tctl:
Delete a Namespace: The
DeleteNamespace
API deletes a Namespace. Deleting a Namespace deletes all running and completed Workflow Executions on the Namespace, and removes them from the persistence store and the visibility store. Example://...
client.OperatorService().DeleteNamespace(ctx, &operatorservice.DeleteNamespaceRequest{...
//...
On Temporal Cloud, use the Temporal Cloud UI or tcld commands to manage Namespaces.
On self-hosted Temporal Cluster, you can manage your registered Namespaces using tctl (recommended) or programmatically using APIs. Note that these APIs and tctl commands will not work with Temporal Cloud.
Update information and configuration for a registered Namespace on your Temporal Cluster:
- With tctl:
tctl namespace update
Example - Use the
UpdateNamespace
API to update configuration on a Namespace. Example
import io.temporal.api.workflowservice.v1.*;
//...
UpdateNamespaceRequest updateNamespaceRequest = UpdateNamespaceRequest.newBuilder()
.setNamespace("your-namespace-name") //the namespace that you want to update
.setUpdateInfo(UpdateNamespaceInfo.newBuilder() //has options to update namespace info
.setDescription("your updated namespace description") //updates description in the namespace info.
.build())
.setConfig(NamespaceConfig.newBuilder() //has options to update namespace configuration
.setWorkflowExecutionRetentionTtl(Durations.fromHours(30)) //updates the retention period for the namespace "your-namespace--name" to 30 hrs.
.build())
.build();
UpdateNamespaceResponse updateNamespaceResponse = namespaceservice.blockingStub().updateNamespace(updateNamespaceRequest);
//...- With tctl:
Get details for a registered Namespace on your Temporal Cluster:
- With tctl:
tctl namespace describe
- Use the
DescribeNamespace
API to return information and configuration details for a registered Namespace. Example
import io.temporal.api.workflowservice.v1.*;
//...
DescribeNamespaceRequest descNamespace = DescribeNamespaceRequest.newBuilder()
.setNamespace("your-namespace-name") //specify the namespace you want details for
.build();
DescribeNamespaceResponse describeNamespaceResponse = namespaceservice.blockingStub().describeNamespace(descNamespace);
System.out.println("Namespace Description: " + describeNamespaceResponse);
//...- With tctl:
Get details for all registered Namespaces on your Temporal Cluster:
- With tctl:
tctl namespace list
- Use the
ListNamespace
API to return information and configuration details for all registered Namespaces on your Temporal Cluster. Example
import io.temporal.api.workflowservice.v1.*;
//...
ListNamespacesRequest listNamespaces = ListNamespacesRequest.newBuilder().build();
ListNamespacesResponse listNamespacesResponse = namespaceservice.blockingStub().listNamespaces(listNamespaces); //lists 1-100 namespaces (1 page) in the active cluster. To list all, set the page size or loop until NextPageToken is nil.
//...- With tctl:
Deprecate a Namespace: The
DeprecateNamespace
API updates the state of a registered Namespace to "DEPRECATED". Once a Namespace is deprecated, you cannot start new Workflow Executions on it. All existing and running Workflow Executions on a deprecated Namespace will continue to run. Example:import io.temporal.api.workflowservice.v1.*;
//...
DeprecateNamespaceRequest deprecateNamespace = DeprecateNamespaceRequest.newBuilder()
.setNamespace("your-namespace-name") //specify the namespace that you want to deprecate
.build();
DeprecateNamespaceResponse response = namespaceservice.blockingStub().deprecateNamespace(deprecateNamespace);
//...Delete a Namespace: The
DeleteNamespace
API deletes a Namespace. Deleting a Namespace deletes all running and completed Workflow Executions on the Namespace, and removes them from the persistence store and the visibility store.Example:
//...
DeleteNamespaceResponse res =
OperatorServiceStubs.newServiceStubs(OperatorServiceStubsOptions.newBuilder()
.setChannel(service.getRawChannel())
.validateAndBuildWithDefaults())
.blockingStub()
.deleteNamespace(DeleteNamespaceRequest.newBuilder().setNamespace("your-namespace-name").build());
//...
Content is planned but not yet available.
The information you are looking for may be found in the legacy docs.
Content is planned but not yet available.
The information you are looking for may be found in the legacy docs.
Content is planned but not yet available.
The information you are looking for may be found in the legacy docs.