Writing a Workflow File
To use spockctrl
to make an automated change to your cluster, you need to describe the changes in a workflow file. A workflow file describes the changes you'll be making to your cluster in a step-by-step, node-by-node manner, and needs to be customized for your cluster.
The new node should not be accessible to users while adding a node with a workflow.
Do not modify your cluster's DDL during node addition.
All nodes in your cluster must be available for the duration of the addition.
If the workflow fails, don't invoke the workflow again until you ensure that all artifacts created by previous run of the workflow have been removed!
Sample workflows are available in the Spock extension's Github repository for common activities; the samples can help you:
add_node.json
- Add a node to a cluster.remove_node.json
- Remove a node from a cluster.cross-wire.json
- Configure subscriptions and replication artifacts between empty nodes.uncross-wire.json
- Remove subscriptions and replication artifacts from empty nodes.
To execute a workflow, include the -w
(or --workflow=<path_to_workflow_file>
) command-line option when you invoke spockctrl
, followed by the path to the workflow JSON file.
spockctrl --config=/path/to/my/spockctrl.json --workflow=/path/to/my/workflow.json
or
spockctrl -c path/to/my/spockctrl.json -w path/to/my/workflow.json
Example - Workflow to Add a Node to a Two-Node Cluster
In this example, we'll walk through the stanzas that make up a workflow that adds a new node to a two-node cluster. Within the workflow file, the COMMAND
property identifies the action performed by the stanza in which it is used. Spock 5.0 supports the following COMMANDs
:
COMMAND | Description |
---|---|
CREATE NODE | Add a node to a cluster. |
DROP NODE | Drop a node from a cluster. |
CREATE SUBSCRIPTION | Add a subscription to a cluster. |
DROP SUBSCRIPTION | Drop a subscription from a cluster. |
CREATE REPSET | Add a repset to a cluster. |
DROP REPSET | Drop a node to a cluster. |
CREATE SLOT | Add a replication slot. |
DROP SLOT | Drop a replication slot. |
ENABLE SUBSCRIPTION | Start replication on a node. |
DISABLE SUBSCRIPTION | Stop replication on a node. |
SQL | Invoke the specified Postgres SQL command. |
In this walkthrough, we're using a two-node cluster; if your cluster is larger than two nodes, any actions performed on the replica node (in our example, n2
) should be performed on every replica node in your cluster. A replica node is any existing node that is not used as a source node.
Our sample workflow adds a third node to a new node cluster. The first stanza provides connection information for the host of the new node. Spockctrl can add only one node per workflow file; provide this information for each new node you add to your cluster.
This stanza associates the name of the new node with the connection properties of the new node:
- Provide the new node name in the
node
property and the--node_name
property - the name must be identical. --dsn
specifies the connection properties of the new node.
{
"workflow_name": "Add Node",
"description": "Adding third node (n3) to two node (n1,n2) cluster.",
"steps": [
{
"spock": {
"node": "n3",
"command": "CREATE NODE",
"description": "Create a spock node n3",
"args": [
"--node_name=n3",
"--dsn=host=127.0.0.1 port=5433 user=pgedge password=pgedge",
"--location=Los Angeles",
"--country=USA",
"--info={\"key\": \"value\"}"
],
"on_success": {},
"on_failure": {}
}
},
The next stanza creates the subscription from the new node (n3
) to the source node (n1
). In this stanza:
- the
node
property specifies the name of the node in our original cluster that is used as our source node. The content of this node will be copied to the new node that we are creating. --sub_name
specifies the name of the new subscription.--provider_dsn
specifies the connection properties of the provider (our new node).--replication_sets
specifies the names of the replication sets created for the subscription.--enabled
,--synchronize_data
and--synchronize_structure
must betrue
.
{
"spock": {
"node": "n1",
"command": "CREATE SUBSCRIPTION",
"description": "Create a subscription (sub_n3_n1) on (n1) for n3->n1",
"sleep": 0,
"args": [
"--sub_name=sub_n3_n1",
"--provider_dsn=host=127.0.0.1 port=5433 user=pgedge password=spockpass",
"--replication_sets=ARRAY['default', 'default_insert_only', 'ddl_sql']",
"--synchronize_structure=true",
"--synchronize_data=true",
"--forward_origins='{}'::text[]",
"--apply_delay='0'::interval",
"--force_text_transfer=false",
"--enabled=true"
],
"on_success": {},
"on_failure": {}
}
},
Our next stanza creates subscriptions between the new node and any existing replica nodes.
node
specifies the name of the existing replica node.--provider_dsn
specifies the connection properties of the new node; this is the provider node for the new subscription.--replication_sets
specifies the names of the replication sets created for the subscription--enabled
,--synchronize_data
and--synchronize_structure
must betrue
.
You will need to include this stanza once for each replica node in your cluster; if your existing cluster has three nodes (one source node and two replica nodes), you will add two copies of this stanza. If your existing cluster has four nodes (one source node and three replica nodes), you will add three copies of this stanza.
{
"spock": {
"node": "n2",
"command": "CREATE SUBSCRIPTION",
"description": "Create a subscription (sub_n3_n2) on (n2) for n3->n2",
"sleep": 0,
"args": [
"--sub_name=sub_n3_n2",
"--provider_dsn=host=127.0.0.1 port=5433 user=pgedge password=spockpass",
"--replication_sets=ARRAY['default', 'default_insert_only', 'ddl_sql']",
"--synchronize_structure=true",
"--synchronize_data=true",
"--forward_origins='{}'::text[]",
"--apply_delay='0'::interval",
"--force_text_transfer=false",
"--enabled=true"
],
"ignore_errors": false,
"on_success": {},
"on_failure": {}
}
},
In the next step, we wait for the apply worker to check state of the subscription. This step is optional, but should be considered a best practice. This step uses a SQL command to call the spock.wait_for_apply_worker
function.
$n2.sub_create
is a variable (the subscription ID) populated by the previousCREATE SUBSCRIPTION
stanza.- If needed, use the
sleep
property to accomodate processing time.
Note that if you have more than one replica node, and multiple CREATE SUBSCRIPTION
stanzas, each stanza should be followed by a copy of this stanza, with the variable reset for each subsequent execution ($n3.sub_create
, $n4.sub_create
, etc).
{
"sql": {
"node": "n2",
"command": "SQL",
"description": "Wait for apply worker on n2 subscription",
"sleep": 0,
"args": [
"--sql=SELECT spock.wait_for_apply_worker($n2.sub_create, 1000);"
],
"on_success": {},
"on_failure": {}
}
},
In our next stanza, we create a subscription between each replica node (n2
) and our new node (n3
).
--provider_dsn
is the connection string of the replica node (in our example, the provider is the first node referenced in our subscription).--enabled
,--synchronize_data
,--force_text_transfer
and--synchronize_structure
must befalse
.
If you have multiple replica nodes, you will need one iteration of this stanza for each replica node in your cluster.
{
"spock": {
"node": "n3",
"command": "CREATE SUBSCRIPTION",
"description": "Create a subscription (sub_n2_n3) on (n3) for n2->n3",
"sleep": 5,
"args": [
"--sub_name=sub_n2_n3",
"--provider_dsn=host=127.0.0.1 port=5432 user=pgedge password=spockpass",
"--replication_sets=ARRAY['default', 'default_insert_only', 'ddl_sql']",
"--synchronize_structure=false",
"--synchronize_data=false",
"--forward_origins='{}'::text[]",
"--apply_delay='0'::interval",
"--force_text_transfer=false",
"--enabled=false"
],
"on_success": {},
"on_failure": {}
}
},
In the next stanza, we use a CREATE SLOT
command to create a replication slot for the new subscription between our existing replica (n2
) and our new node (n3
). Provide the slot name in the form spk_database-name_node-name_subscription-name
where:
database-name
is the name of your database.node-name
name of the existing replica node.subscription-name
is the subscription created in the previous step.
You must create one replication slot for each iteration of the CREATE SUBSCRIPTION stanza that allows a replica node to communicate with the new node; if you have three nodes in your cluster (one source node, and two replica nodes), you will need to provide one slot for each replica node (or two slots total).
{
"spock": {
"node": "n2",
"command": "CREATE SLOT",
"description": "Create a logical replication slot spk_pgedge_n2_sub_n2_n3 on (n2)",
"args": [
"--slot=spk_pgedge_n2_sub_n2_n3",
"--plugin=spock_output"
],
"on_success": {},
"on_failure": {}
}
},
Next, on each replica node, we invoke a SQL command that starts a sync event between the existing node(n2
) and our source node (n1
). This ensures that our replica nodes stay in sync with the source node for the duration of the ADD NODE process.
The function (spock.sync_event
) returns the log sequence number (LSN) of the sync event:
{
"sql": {
"node": "n2",
"command": "SQL",
"description": "Trigger a sync event on (n2)",
"sleep": 10,
"args": [
"--sql=SELECT spock.sync_event();"
],
"on_success": {},
"on_failure": {}
}
},
The previous stanza returns the LSN of the sync event in the $n2.sync_event
variable; in the next stanza, we watch for that variable so we know when the step is complete.
node
specifies the cluster source node (in our example,n1
).$n2.sync_event
is the LSN returned by the previous stanza.- If needed, you can use
sleep
to provide extra processing time for the step.
Include this stanza once for each iteration of the previous stanza within your workflow file.
{
"sql": {
"node": "n1",
"command": "SQL",
"description": "Wait for a sync event on (n1) for n2-n1",
"sleep": 0,
"args": [
"--sql=CALL spock.wait_for_sync_event(true, 'n2', '$n2.sync_event'::pg_lsn, 1200000);"
],
"on_success": {},
"on_failure": {}
}
},
In the next stanza, we create a subscription on our new target node (n3
) from the source node (n1
):
arg->--sub_name
issub_n1_n3
.arg->--provider_dsn
is the connection string for the source node (our provider,n1
).--enabled
,--synchronize_data
, and--synchronize_structure
must betrue
.--force_text_transfer
must befalse
.
{
"spock": {
"node": "n3",
"command": "CREATE SUBSCRIPTION",
"description": "Create a subscription (sub_n1_n3) for n1 fpr n1->n3",
"sleep": 0,
"args": [
"--sub_name=sub_n1_n3",
"--provider_dsn=host=127.0.0.1 port=5431 user=pgedge password=spockpass",
"--replication_sets=ARRAY['default', 'default_insert_only', 'ddl_sql']",
"--synchronize_structure=true",
"--synchronize_data=true",
"--forward_origins='{}'::text[]",
"--apply_delay='0'::interval",
"--force_text_transfer=false",
"--enabled=true"
],
"on_success": {},
"on_failure": {}
}
},
Then, we include a stanza that triggers a sync event on the source node (n1
) between the source node and the new node (n3
). Use the sleep
property to allocate time for the data to sync to the new node if needed.
{
"sql": {
"node": "n1",
"command": "SQL",
"description": "Trigger a sync event on (n1)",
"sleep": 5,
"args": [
"--sql=SELECT spock.sync_event();"
],
"on_success": {},
"on_failure": {}
}
},
In the next stanza, we wait for the sync event started in the previous stanza to complete. Use the sleep
property to allocate time for the data to sync to the new node if needed.
{
"sql": {
"node": "n3",
"command": "SQL",
"description": "Wait for a sync event on (n1) for n1-n3",
"sleep": 10,
"args": [
"--sql=CALL spock.wait_for_sync_event(true, 'n1', '$n1.sync_event'::pg_lsn, 1200000);"
],
"on_success": {},
"on_failure": {}
}
},
In the next stanza, we check for data lag between the new node (n3) and any replica nodes in our cluster (n2). The timestamp returned is passed to the next stanza for use in evaluating the comparative state of the replica nodes and our new node.
{
"sql": {
"node": "n3",
"command": "SQL",
"description": "Check commit timestamp for n3 lag",
"sleep": 1,
"args": [
"--sql=SELECT commit_timestamp FROM spock.lag_tracker WHERE origin_name = 'n2' AND receiver_name = 'n3'"
],
"on_success": {},
"on_failure": {}
}
},
In the next stanza, we use the timestamp from the previous stanza ($n3.commit_timestamp
) to advance our replica slot to that location within our log files. This effectively advances transactions to the time specified (preventing duplicate entries from being written to the replica node).
{
"sql": {
"node": "n2",
"command": "SQL",
"description": "Advance the replication slot for n2->n3 based on a specific commit timestamp",
"sleep": 0,
"args": [
"--sql=WITH lsn_cte AS (SELECT spock.get_lsn_from_commit_ts('spk_pgedge_n2_sub_n2_n3', '$n3.commit_timestamp'::timestamp) AS lsn) SELECT pg_replication_slot_advance('spk_pgedge_n2_sub_n2_n3', lsn) FROM lsn_cte;"
],
"on_success": {},
"on_failure": {}
}
},
Then, we enable the subscription from each replica node to the new node. At this point replication starts between any existing node and the new node (n3
).
{
"spock": {
"node": "n3",
"command": "ENABLE SUBSCRIPTION",
"description": "Enable subscription (sub_n2_n3) on n3",
"args": [
"--sub_name=sub_n2_n3",
"--immediate=true"
],
"on_success": {},
"on_failure": {}
}
},
After starting replication, we check the lag time between the new node and each node in the cluster. This step invokes a SQL command that loops through each node in the cluster and compares the lag on each node until each returned value is comparable. Use the sleep
property to extend processing time if needed.
{
"sql": {
"node": "n3",
"command": "SQL",
"description": "Advance the replication slot for n2->n3 based on a specific commit timestamp",
"sleep": 0,
"args": [
"--sql=DO $$\nDECLARE\n lag_n1_n3 interval;\n lag_n2_n3 interval;\nBEGIN\n LOOP\n SELECT now() - commit_timestamp INTO lag_n1_n3\n FROM spock.lag_tracker\n WHERE origin_name = 'n1' AND receiver_name = 'n3';\n\n SELECT now() - commit_timestamp INTO lag_n2_n3\n FROM spock.lag_tracker\n WHERE origin_name = 'n2' AND receiver_name = 'n3';\n\n RAISE NOTICE 'n1 → n3 lag: %, n2 → n3 lag: %',\n COALESCE(lag_n1_n3::text, 'NULL'),\n COALESCE(lag_n2_n3::text, 'NULL');\n\n EXIT WHEN lag_n1_n3 IS NOT NULL AND lag_n2_n3 IS NOT NULL\n AND extract(epoch FROM lag_n1_n3) < 59\n AND extract(epoch FROM lag_n2_n3) < 59;\n\n PERFORM pg_sleep(1);\n END LOOP;\nEND\n$$;\n"
],
"on_success": {},
"on_failure": {}
}
}
]
}
The SQL command from the last step (in a more readable format) is:
DO $$
DECLARE
lag_n1_n3 interval;
lag_n2_n3 interval;
BEGIN
LOOP
SELECT now() - commit_timestamp INTO lag_n1_n3
FROM spock.lag_tracker
WHERE origin_name = 'n1' AND receiver_name = 'n3';
SELECT now() - commit_timestamp INTO lag_n2_n3
FROM spock.lag_tracker
WHERE origin_name = 'n2' AND receiver_name = 'n3';
RAISE NOTICE 'n1 -> n3 lag: %, n2 -> n3 lag: %',
COALESCE(lag_n1_n3::text, 'NULL'),
COALESCE(lag_n2_n3::text, 'NULL');
EXIT WHEN lag_n1_n3 IS NOT NULL AND lag_n2_n3 IS NOT NULL
AND extract(epoch FROM lag_n1_n3) < 59
AND extract(epoch FROM lag_n2_n3) < 59;
PERFORM pg_sleep(1);
END LOOP;
END
$$;