Introduction
Async-api is a API definition standard (similar to OpenAPI) that will enable dynamic eventing mechanisms between peers. Workflow Hub supports using Async-API to send messages between TCA and TCSA over Kafka. Similar to OpenAPI, user would define schemas, but these schemas are related to messaging format instead of REST APIs. The Schema would be developed according to Async-API standards. Such a schema can then be uploaded to Workflow Hub DB using the Open API Tab in the UI.
Once the schema is uploaded, users can then invoke a Async-API function call with the arguments similar to how the user would use REST API functions in the workflow. The Workflow Hub would parse the input arguments and populate the message according to the format, and then send the message over the chosen protocol.
Supported Capabilities
- Schema Supported is Async API 2.1.0.
- Workflow Hub supports messages over Kafka Interface only. Other interfaces such as AQMP, RabbitMQ, etc are not supported.
- The primary goal of this feature is to support TCSA integration with TCA. The message format that has been used in our testing is the TCSA message format.
- Messages can be published. Messages cannot be consumed by Workflow Hub
- Primitives and Objects are supported in the schema. However, JSON arrays are not.
Usage
Upload Async API Schema Workflow Hub
The user must upload the async-api to Workflow Hub so that the schema content can be made available to the workflows. This is done using the Schema tab in the Workflow Hub UI.
Async API Schema for Kafka Integration |
---|
asyncapi: 2.1.0 id: urn:com:vmware:sebu:tca:wfh:tcsaintegration info: title: TCSA Integration Message version: 1.0.0 description: | This file describes the API through which WFH would report the status of workflows to TCSA. termsOfService: https://www.vmware.com/content/dam/digitalmarketing/vmware/en/pdf/product/vmware-product-guide.pdf license: name: Vmware License url: https://www.vmware.com contact: email: [email protected] name: VMware url: https://www.vmware.com servers: local-kafka-plain: url: 'kafka_plain:29092' protocol: kafka protocolVersion: '7.4.0-ccs' description: This kafka broker is unencrypted and has no authentication local-kafka-sasl-plain: url: 'kafka_secure:29092' protocol: kafka protocolVersion: '7.4.0-ccs' description: This kafka broker is unencrypted but uses SASL authentication to authenticate producers/consumers bindings: sasl_username: admin sasl_password: YWRtaW4tc2VjcmV0 sasl_mechanism: PLAIN security_protocol: SASL_PLAINTEXT defaultContentType: application/json channels: tcsastatuschannel: bindings: kafka: topic: tca-stages description: This channel is used by WFH to report the progress of the workflow publish: operationId: sendupdatetotcsa summary: Sends update to TCSA description: workflows can use this schema to send updates to TCSA about the progress of WF execution message: payload: type: object properties: site_id: type: string fsm_state: type: string fsm_status: type: string pattern: '^(Started|Ended|Failed)$' error_msg: type: string pattern: '^(None|AIR|CAAS|CNF|ZTP).+$' timestamp: type: string additional_data: $ref: "#/components/schemas/additionaldata" required: - site_id - fsm_state - fsm_status - error_msg - timestamp components: schemas: additionaldata: type: object properties: task: type: string task_status: type: string event: type: string runId: type: string clustername: type: string dashboard: $ref: "#/components/schemas/dashboard" logs: type: string dashboard: type: object properties: aoi: type: string csr_server_type: type: string dag_run: type: string du_vendor: type: string market: type: string region: type: string server_model: type: string siteId: type: string securitySchemes: kafka_security: type: plain |
A detailed explanation of the schema is outside the scope of this document. User can refer to Async-API standard. The main sections are covered below
- Servers:
- In the Section servers, the details of the end-points for communication is provided. The same schema can be used to send messages to multiple Kafka server. The authentication details of the Kafka server is also provided along with the server details (Ex: see server local-kafka-sasl-plain). Other server related parameters can be passed under the bindings, and they need to correspond to librdKafka configuration parameters. The configuration parameters exposed in librdkafka uses the "." between words of the variable (Ex: "builtin.features" , "client.id", etc.). In the async-api schema, the "." needs to be replaced by "_".
- Note that the Server urls need to be modified to suit the customer environment. The Kafka server must be up and available.
- Channels: Channels describe the communication protocol between the peers. While multiple channels can be defined each with their own message format, the use case (i.e., TCSA integration) uses a single message format. Consequently, a single channel is defined in the message. A channel can have 2 operations "publish" and "subscribe". Publish allows the workflow to publish events, while subscribe allows the user to receive events and act on it in the workflow. As of WFH 3.0 only publish is supported.
- Topic: Topics can be defined in the channel bindings as mentioned above. If the topic is NOT defined, then the operationId is used as the topic name. The topic must be pre-created on the Kafka brokers to receive the message.
- OperationId: A operation must be configured with an operationId (in the example, the value of this operationId is sendupdatetotcsa). The workflow would refer to the operation to be performed using the operationId.
- message: The message→payload describes the schema of the message format. The messages can contain primitives and objects. The Workflow Hub can recursively work through the objects and generate the message according to the chosen schema. A message field cannot be a json array as this is unsupported in the workflow hub. Similar to Openapi, some fields can be required and some can be optional. Similar to OpenAPI, an object can refer to component/schema. Any field that is of type "string" can be checked to fit a specific pattern using regex matching. The parameter "pattern" must be a regex expression. For example, in the message format above, the field error_msg contains an error code. As this is a predefined error code, it starts with "AIR", "CNF", "ZTP" or when the workflow is successful "None". The async-api checks to see if the received arguments match the pattern described.
Create Sample Workflow
Async API Demo Workflow |
---|
id: async-event-demo name: Kafka Publish version: 0.1.0 description: Workflow to demonstrate the Event Publishing to Kafka. This workflow can be integrated with other workflows to send events about status of workflows specVersion: 0.7.0 start: Start_workflow functions: - name: PublishWorkflowStatus operation: db://async-api/2.1.0/kafka/publish/demo/tcsa.json#sendupdatetotcsa type: asyncapi metadata: tlsVerify: false states: - name: Start_workflow type: operation actions: - functionRef: refName: PublishWorkflowStatus arguments: site_id: "${ .start.site_id }" fsm_state: "${ .start.fsm_state }" fsm_status: "${ .start.fsm_status }" error_msg: "${ .start.error_msg }" timestamp: "${ .start.timestamp }" additional_data: "${ .start.additional_data }" Content-Type: application/json transition: Sleep1 - name: Sleep1 type: sleep duration: PT1S transition: End_workflow - name: End_workflow type: operation actions: - functionRef: refName: PublishWorkflowStatus arguments: site_id: "${ .end.site_id }" fsm_state: "${ .end.fsm_state }" fsm_status: "${ .end.fsm_status }" error_msg: "${ .end.error_msg }" timestamp: "${ .end.timestamp }" additional_data: "${ .end.additional_data }" Content-Type: application/json end: true |
The demo workflow has two states.
- Start Workflow: Start workflow sends the message to the Kafka at the beginning of the workflow. This will indicate to the TCSA the progress of cell site automation (in case of composite workflows used for E2E automation).
- End Workflow: End workflow sends the message to Kafka at the end of the workflow. This will indicate to TCSA that the workflow has completed execution.
This sample workflow would be available as part of the pre-built workflows to provide an example for TCSA Integration.
Workflow Execution
Input.Json |
---|
{ "start": { "site_id": "esx-192-168-146-69", "fsm_state": "S2", "fsm_status": "Started", "error_msg": "AIR142027", "additional_data": { "task": "PushTemplateToESXiHost", "task_status": "Ongoing", "event": "Started", "dagrunid": "a69esx-192-168-146-692022-11-21 08:57:40.423437esx-192-168-146-692022-11-21 09:03:41.454028", "clustername": "None", "dashboard": { "aoi": "RIV", "csr_server_type": "LIT", "dag_run": "gharidas_auto_26051", "du_vendor": "Samsung", "market": "LAB", "region": "Central", "server_model": "Dell XR11", "siteId": "esx-192-168-146-69" } } }, "end": { "site_id": "esx-192-168-146-69", "fsm_state": "S2", "fsm_status": "Failed", "error_msg": "AIR142027", "additional_data": { "task": "PushTemplateToESXiHost", "task_status": "Failed", "event": "ended", "dagrunid": "a69esx-192-168-146-692022-11-21 08:57:40.423437esx-192-168-146-692022-11-21 09:03:41.454028", "clustername": "None", "dashboard": { "aoi": "RIV", "csr_server_type": "LIT", "dag_run": "gharidas_auto_26051", "du_vendor": "Samsung", "market": "LAB", "region": "Central", "server_model": "Dell XR11", "siteId": "esx-192-168-146-69" }, "logs": "Unable to connect to url. Please cross check the url and expiry time if any HTTP Error 400: Bad Request" } } } |
In a production workflow where TCSA is to be integrated, some of the parameters such as error_msg, fsm_state, fsm_status, timestamp will be generated from within the workflow. However, other parameters such as dashboard should come from input.
Workflow Failures
There are two types of errors:
- When the arguments passed to the async-api function do not follow the schema requirements, an error is raised and this is passed to the workflow. The workflow fails the execution.
- When the message is properly constructed, there could be communication errors between the Kafka broker and the Workflow Hub. This can result in a message transmission failure. Message transmission failures are logged, but they do not fail the workflow.
DNS Update for TCSA-Integration
TCSA Kafka server name is hard-coded to be kafka-edge in every deployment. Consequently, the server certificate presented by TCSA Kafka is for the host-name kafka-edge. This host, is not routable without a DNS entry that can resolve it to the corresponding IP. Consequently, we need to ensure that the TCA's core-dns module has an entry for kafka-edge. This is a manual step today. It is required whenever we need to integrate with TCSA with secure-kafka.
Templating Support
VMware Telco Cloud Automation 3.1.0 supports templating the AsyncAPIs.
asyncapi: 2.1.0 id: urn:com:vmware:sebu:tca:wfh:tcsaintegration info: title: TCSA Integration Message version: 1.0.0 description: | This file describes the API through which WFH would report the status of workflows to TCSA. termsOfService: https://www.vmware.com/content/dam/digitalmarketing/vmware/en/pdf/product/vmware-product-guide.pdf license: name: Vmware License url: https://www.vmware.com contact: email: [email protected] name: VMware url: https://www.vmware.com servers: local-kafka-sasl-secure: url: '{server}:{port}' variables: server: description: This value is assigned by the service provider, in this example gigantic-server.com port: description: This port used for communication protocol: kafka protocolVersion: '7.4.0-ccs' description: This kafka broker is encrypted with TLS and uses SCRAM-512 based auth SASL_SSL protocol bindings: kafka: sasl_username: type: string sasl_password: type: string sasl_mechanism: 'PLAIN' security_protocol: type: string defaultContentType: application/json channels: tcsastatuschannel: bindings: kafka: topic: tca-stages description: This channel is used by WFH to report the progress of the workflow publish: operationId: sendupdatetotcsa summary: Sends update to TCSA description: workflows can use this schema to send updates to TCSA about the progress of WF execution message: headers: $ref: "#/components/schemas/headers" payload: type: object properties: site_id: type: integer maximum: 3 minimum: 1 fsm_state: type: number maximum: 0.4 minimum: 0.1 required: - site_id - fsm_state components: schemas: headers: type: object properties: fsm_status: type: boolean default: false abc: type: boolean default: false error_msg: type: string pattern: '^(None|AIR|CAAS|CNF|ZTP).+$' minimum: 3 maximum: 10 timestamp: type: string format: date-time required: - fsm_status - error_msg - timestamp additionaldata: type: object properties: task: type: string task_status: type: string event: type: string runId: type: string clustername: type: string dashboard: $ref: "#/components/schemas/dashboard" logs: type: string dashboard: type: object properties: aio: type: string csr_server_type: type: string dag_run: type: string du_vendor: type: string market: type: string region: type: string server_model: type: string siteId: type: string securitySchemes: kafka_security: type: plain |
id: templatedemo name: templatedemo version: 0.1.0 description: >- Workflow to demonstrate the Event Publishing to Kafka using an async api template. specVersion: 0.7.0 start: Start_workflow functions: - name: PublishWorkflowStatus operation: 'db://async-api/2.1.0/template/demo/tcsa.json#sendupdatetotcsa' type: asyncapi metadata: tlsVerify: false states: - name: Start_workflow type: operation actions: - functionRef: refName: PublishWorkflowStatus arguments: site_id: '${ .start.site_id }' fsm_state: '${ .start.fsm_state }' fsm_status: '${ .start.fsm_status }' error_msg: '${ .start.error_msg }' timestamp: '${ now | strftime("%Y-%m-%d %H:%M:%S.%f")}' additional_data: '${ .start.additional_data }' server: '${ .start.server }' port: '${ .start.port }' Content-Type: application/json stateDataFilter: output: '${ .timestamp1 = (now | strftime("%Y-%m-%d %H:%M:%S.%f")) }' transition: Sleep1 - name: Sleep1 type: sleep duration: PT1S transition: End_workflow - name: End_workflow type: operation actions: - functionRef: refName: PublishWorkflowStatus arguments: site_id: '${ .end.site_id }' fsm_state: '${ .end.fsm_state }' fsm_status: '${ .end.fsm_status }' error_msg: '${ .end.error_msg }' timestamp: '${ now | strftime("%Y-%m-%d %H:%M:%S.%f")}' additional_data: '${ .end.additional_data }' server: '${ .end.server }' port: '${ .start.port }' Content-Type: application/json stateDataFilter: output: '${ .timestamp2 = (now | strftime("%Y-%m-%d %H:%M:%S.%f")) }' end: true |
{ "start": { "site_id": "esx-192-168-146-69", "fsm_state": "S2", "fsm_status": "Started", "server": "10.220.226.116", "port": "9092", "server": "localhost", "sasl_username": "admin", "sasl_password": "YWRtaW4tc2VjcmV0", "security_protocol": "SASL_PLAINTEXT", "error_msg": "AIR7891", "additional_data": { "task": "PushTemplateToESXiHost", "task_status": "Ongoing", "event": "Started", "dagrunid": "a69esx-192-168-146-692022-11-21 08:57:40.423437esx-192-168-146-692022-11-21 09:03:41.454028", "clustername": "None", "dashboard": { "aoi": "RIV", "csr_server_type": "LIT", "dag_run": "gharidas_auto_26051", "du_vendor": "Samsung", "market": "LAB", "region": "Central", "server_model": "Dell XR11", "siteId": "esx-192-168-146-69" } } }, "end": { "site_id": "esx-192-168-146-69", "fsm_state": "S2", "fsm_status": "Failed", "server": "10.220.226.116", "port": "9092", "error_msg": "AIR7891", "additional_data": { "task": "PushTemplateToESXiHost", "task_status": "Failed", "event": "ended", "dagrunid": "a69esx-192-168-146-692022-11-21 08:57:40.423437esx-192-168-146-692022-11-21 09:03:41.454028", "clustername": "None", "dashboard": { "aoi": "RIV", "csr_server_type": "LIT", "dag_run": "gharidas_auto_26051", "du_vendor": "Samsung", "market": "LAB", "region": "Central", "server_model": "Dell XR11", "siteId": "esx-192-168-146-69" }, "logs": "Unable to connect to url. Please cross check the url and expiry time if any HTTP Error 400: Bad Request" } } } |