A remediation action is a Directed Acyclic Graph (DAG) that uses Python.
- Each action is composed of one or more tasks.
- Each task uses a particular operator. For example, HTTP/ssh/slack/python for connecting to external services.
- The tasks for HTTP/ssh/slack can refer to the Connections when connecting to an external service. For example, for a HTTP GET/POST operation.
- The python-operator task is used for writing a Python function that works on the data received from or to is sent to external services.
- You must specify the paths for the operators and Python modules in the import section of the Python action file.
- You must import the success and failure callbacks.
import json from datetime import datetime import logging import urllib.parse from airflow.models import DAG from airflow.models.param import Param from airflow.providers.http.operators.http import SimpleHttpOperator from airflow.operators.python import PythonOperator from Success import * from Failure import *
DAG definition calls must supply the parameters required by the action.
myParams = { "notification_obj": {} }
DAG definition requires the dag name (dag_id), schedule interval, start date, parameters, and references to the success and failure callbacks.
dag_id='{{tcsa.dag_name}}', schedule_interval=None, start_date=datetime(2023,6,2), params=myParams, is_paused_upon_creation=False, catchup=False, on_success_callback =success, on_failure_callback =failure ) as dag:
Each task of a DAG must specify the task_id, dag name, optional connection id/command, and success and failure callbacks.
Example User-Defined Action
import json from datetime import datetime import logging import urllib.parse from airflow.models import DAG from airflow.models.param import Param from airflow.providers.http.operators.http import SimpleHttpOperator from airflow.operators.python import PythonOperator from Success import * from Failure import * # callable function for handling the task task_edit_notif # modifies the notification data using user defined parameter values # then it sends the updated data to xcoms so that downstream tasks can access it def task_edit_notif_func(**context) -> None: updatedNotif = context["params"]["notification_obj"] updatedNotif["UserDefined3"] = '{{tcsa.udf3Param}}' + '-' + context["params"]["notification_obj"]["InstanceName"] updatedNotif["UserDefined4"] = "{{tcsa.udf4Param}}" quotedUrl = 'vsa_events/_doc/Name:' + urllib.parse.quote(context["params"]["notification_obj"]["Name"], safe='') + '$Source:' + context["params"]["notification_obj"]["Source"] + '$' ti = context.get('ti') # push the data to xcoms logging.info("xcom udfData value is: %s", json.dumps(updatedNotif)) ti.xcom_push(key='udfData',value=json.dumps(updatedNotif)) ti.xcom_push(key='esUrl',value=quotedUrl) logging.info("xcom esUrl value is : %s", quotedUrl) with DAG( dag_id='{{tcsa.dag_name}}', schedule_interval=None, start_date=datetime.now(), params={}, is_paused_upon_creation=False, catchup=False, on_success_callback =success, on_failure_callback =failure ) as dag: # 1. Get notification from opensearch and modify some data task_edit_notif = PythonOperator(task_id='task_edit_notif', python_callable=task_edit_notif_func, dag=dag, on_success_callback = success, on_failure_callback = failure ) # 2. Update the modified notification data in Opensearch using REST API task_update_notif = SimpleHttpOperator( task_id='task_update_notif', method='PUT', http_conn_id='{{tcsa.opensearch_connection_id}}', endpoint='[[ ti.xcom_pull(task_ids=["task_edit_notif"][0], key="esUrl") ]]', headers={"Accept":"application/json","Content-Type":"application/json"}, data = '[[ ti.xcom_pull(task_ids=["task_edit_notif"][0], key="udfData") ]]', extra_options = {'verify':False}, do_xcom_push=True, log_response=True, on_success_callback = success, on_failure_callback = failure ) # specify the order of the tasks, goes left to right task_edit_notif >> task_update_notif
The tasks must be specified at the bottom of the file. The following sequence implies that task
task_edit_notif
is executed first. Then on task
task_edit_notif
success, task
task_update_notif
is executed. If the
task_edit_notif
task fails, the
task task_update_notif
task will not run.
task_edit_notif >> task_update_notif
How to Use a Connection and a Parameter for a User-Defined Action
For creating a new Escalation or Remediation Rule with a user-defined action, you can use Connections and pass values to the Parameters.
- You can indicate a HTTP/other connection by specifying the type of connection on the left hand side and a variable starting with prefix tcsa on the right hand side, as shown below for a HTTP connection.
http_conn_id='{{tcsa.opensearch_connection_id}}'
- Define an action parameter with a variable name starting with prefix tcsa.
For example:
'{{tcsa.udf3Param}}'
as shown in the sample action:updatedNotif["UserDefined3"] = '{{tcsa.udf3Param}}'+ '-' + context["params"]["notification_obj"]["InstanceName"] updatedNotif["UserDefined4"] = '{{tcsa.udf4Param}}"
- Update the parameter name as shown in the sample user-defined action.
updatedNotif["UserDefined3"] = '{{tcsa.UserDefined3Value}}'
- When creating a Remediation Rule with that action, you will be prompted to update a value for the UserDefined3Values parameter field.
- Select a relevant connection which in this case is the connection to VMware Telco Cloud Service Assurance Opensearch named es_http.
Note: Before using above connection es_http, make sure relevant connection details are added for this connection in Actions user interface:
- Navigate to connection.
- Enter details, as given below:
You must enter a valid username and password for Opensearch.
The Schema field must be blank.
- Click Save.
- Pass a value to the relevant UserDefined3Value parameter.
- Select a relevant connection which in this case is the connection to VMware Telco Cloud Service Assurance Opensearch named es_http.