# How-to guides - Multistep actions
In this segment, we will be going through the creation of actions that work with asynchronous APIs. Typically, when working with asynchronous APIs to kickstart a long running job or process in a target application, often times you'll send a request and expect an ID that corresponds to that job or process. Your action would then want to constantly check back with the API to see if the job is completed before retrieving results or moving on to the next step in the recipe.
For example, when you send a request to Google BigQuery to start a query, Google BigQuery might send you back the job ID. Your task would be to now regularly check back with Google BigQuery to see if the query is completed before retrieving the rows.
Rather than having the user configure this logic in the recipe, you can now embed this entire logic into a single action with "multi-step" actions on your custom connector. To use "multi-step" actions, the continue
argument is used in conjunction with a dedicated method called reinvoke_after
. Read on below on how to configure this!
Limitation on Workato's cloud debugger
Multistep actions cannot be debugged on Workato's cloud debugger. To debug multistep actions, use our SDK Gem.
# Sample connector - Google BigQuery
{
title: 'My Google BigQuery connector',
# More connector code here
actions: {
query: {
title: "Execute query",
subtitle: "Execute query in BigQuery",
description: "Run Query in BigQuery",
help: "This query runs synchronously for 25 seconds. If the query takes longer than that, it turns into an asynchronous action. There is a limit of ~38 minutes for the query to complete. ",
input_fields: lambda do
[
{
name: "project_id",
control_type: 'select',
pick_list: 'projects',
optional: false
},
{
name: "query",
optional: false
},
{
name: 'wait_for_query',
control_type: 'checkbox',
sticky: true,
},
{
name: "output_fields",
extends_schema: true,
schema_neutral: false,
sticky: true,
control_type: 'schema-designer',
label: 'Output columns',
hint: 'Provide your output fields for your query if you are providing datapills in your query',
item_label: 'Design your output columns',
sample_data_type: 'csv' # json_input / xml
},
]
end,
execute: lambda do |connection, input, eis, eos, continue|
continue = {} unless continue.present?
current_step = continue['current_step'] || 1
max_steps = 30
step_time = current_step * 60 # This helps us wait longer and longer as we increase in steps
# Minimum step time is 60 seconds
if current_step == 1 # First invocation
payload = {
query: input['query'],
timeoutMs: '25000',
useLegacySql: false
}
url = "https://bigquery.googleapis.com/bigquery/v2/projects/#{input['project_id']}/queries"
response = post(url, payload)
# If user wants to wait for query to complete and
# job isn't complete after 25s
if response['jobComplete'] == false && input['wait_for_query'].is_true?
reinvoke_after(
seconds: step_time,
continue: {
current_step: current_step + 1,
jobid: response['jobReference']['jobId']
}
)
# If user doesn't want to wait for query to complete and
# job isn't complete after 25s
elsif response['jobComplete'] == false
{ jobId: response['jobReference']['jobId'] }
# Job is complete after 25s
else
call('format_rows', response)
end
# Subsequent invocations
elsif current_step <= max_steps
url = "https://bigquery.googleapis.com/bigquery/v2/projects/#{input['project_id']}/jobs/#{continue['jobid']}"
response = get(url)
# If job is still running
if response['status']['state'] == "RUNNING"
reinvoke_after(seconds: step_time.to_i, continue: { current_step: current_step + 1, jobid: continue['jobid']})
# If status is done but there is an error
elsif response['status']['state'] == "DONE" && response.dig('status', 'errorResult').present?
error(response.dig('status', 'errorResult'))
# If status is done
else
results = get("https://bigquery.googleapis.com/bigquery/v2/projects/#{input['project_id']}/queries/#{continue['jobid']}")
call('format_rows', results)
end
else
error("Job took too long!")
end
end,
output_fields: lambda do |object_definitions, config_fields|
schema = [
{
name: "jobId"
},
{
name: "totalRows"
},
{
name: "pageToken"
},
{
name: "rows",
type: "array",
of: "object",
properties: object_definitions['query_output']
}
]
end,
summarize_output: ['rows']
},
}
# Step 1 - Action title, subtitle, description, and help
The first step to making a good action is to properly communicate what the actions does, how it does it and to provide additional help to users. To do so, Workato allows you to define the title, description, and provide hints for an action. Quite simply, the title is the title of an action and the subtitle provides further details of the action. The description of the action then contains specifications and explanation on what the action accomplishes and in the context of the application it connects to. Finally, the help segment provides users any additional information required to make the action work.
To know more about this step, take a look at our SDK reference
# Step 2 - Define input fields
input_fields: lambda do
[
{
name: "project_id",
control_type: 'select',
pick_list: 'projects',
optional: false
},
{
name: "query",
optional: false
},
{
name: 'wait_for_query',
control_type: 'checkbox',
sticky: true,
},
{
name: "output_fields",
extends_schema: true,
schema_neutral: false,
sticky: true,
control_type: 'schema-designer',
label: 'Output columns',
hint: 'Provide your output fields for your query if you are providing datapills in your query',
item_label: 'Design your output columns',
sample_data_type: 'csv' # json_input / xml
},
]
end,
This component tells Workato what fields to show to a user trying to execute the multistep action. In the case of executing a query in BigQuery for example, the user has to provide us the following:
- Google BigQuery GCP project ID
- The query to execute in Google BigQuery
- Whether the action should wait for the query to complete
- And the output columns expected from the query
# Step 3 - Defining the execute key
The execute key tells Workato the endpoint to send the request to and using which HTTP request method and also controls the entire logic as to how this action should interact with this asynchronous API. When configuring multistep actions, you will need to utilize the continue
argument together with the reinvoke_after
method. This will allow you to first invoke the execute
lambda function to insert the query in Google BigQuery, optionally put the job to sleep to be woken up later to check if the query is done.
When the job is woken up, the execute
lambda function is invoked again with the continue
passed to it from the previous reinvoke_after
call. This continue
argument should have the job id of the Google BigQuery job created. We then check if the job is done. If it is still running, we put the job to sleep again. If the job is done, we can retrieve the results and send it as the output of the action.
TIP
Step time must be set to a minimum of 60 seconds. If anything lower is supplied, Workato default to 60 seconds.
execute: lambda do |connection, input, eis, eos, continue|
continue = {} unless continue.present? #For the first invocation, continue is nil
current_step = continue['current_step'] || 1 #Instantiate current_step so we know what step we are on
max_steps = 30 #IMPORTANT. you should set the max number of steps so your action doesn't continue forever. This will cause performance degradation in your recipes
step_time = current_step * 10 # This helps us wait longer and longer as we increase in steps
# Minimum step time is 60 seconds
if current_step == 1 # First invocation
payload = {
query: input['query'],
timeoutMs: '25000',
useLegacySql: false
}
#Request below sends the query to BigQuery
response = post("https://bigquery.googleapis.com/bigquery/v2/projects/#{input['project_id']}/queries", payload)
#if Wait for query is false, the user can get the jobID back and get the results manually.
if response['jobComplete'] == false && input['wait_for_query'].is_true?
# reinvoke_after accepts 2 arguments.
# seconds is an integer that tells us how long to put the job to sleep for. MINIMUM 5 SECONDS
# continue is a hash is passed to the next invocation of the execute block when the job is woken up
reinvoke_after(seconds: step_time, continue: { current_step: current_step + 1, jobid: response['jobReference']['jobId'] })
elsif response['jobComplete'] == false
{ jobId: response['jobReference']['jobId'] }
else
call('format_rows', response)
end
elsif current_step <= max_steps # Subsequent invocations
response = get("https://bigquery.googleapis.com/bigquery/v2/projects/#{input['project_id']}/jobs/#{continue['jobid']}")
# If job is still running, put to sleep again
if response['status']['state'] == "RUNNING"
reinvoke_after(seconds: step_time.to_i, continue: { current_step: current_step + 1, jobid: continue['jobid']})
# If job is done but there was an error, raise an error
elsif response['status']['state'] == "DONE" && response.dig('status', 'errorResult').present?
error(response.dig('status', 'errorResult'))
# Reaching here means job is done and there are results.
else
results = get("https://bigquery.googleapis.com/bigquery/v2/projects/#{input['project_id']}/queries/#{continue['jobid']}")
call('format_rows', results)
end
# if we reach step 31, we need to raise an error and cancel the job.
else
error("Job took too long!")
end
end,
# Step 4 - Defining output fields
This section tells us what datapills to show as the output of the trigger. The name
attributes of each datapill should match the keys in the output of the execute
key.
output_fields: lambda do |object_definitions, config_fields|
schema = [
{
name: "jobId"
},
{
name: "totalRows"
},
{
name: "pageToken"
},
{
name: "rows",
type: "array",
of: "object",
properties: object_definitions['query_output']
}
]
end,
Object definitions
In this example, we make use of the output_fields
given to us by the user in his input fields. Here is the object definition of query_output
.
query_output: {
fields: lambda do |connection, config_fields, object_definitions|
next if config_fields['output_fields'].blank?
parse_json(config_fields['output_fields'])
end
}
Last updated: 4/5/2023, 11:28:53 AM