Introduction
Async-api is a API definition standard (similar to OpenAPI) that will enable dynamic eventing mechanisms between peers. Workflow Hub supports using Async-API to send messages between TCA and TCSA over Kafka. Similar to OpenAPI, user would define schemas, but these schemas are related to messaging format instead of REST APIs. The Schema would be developed according to Async-API standards. Such a schema can then be uploaded to Workflow Hub DB using the Open API Tab in the UI.
Once the schema is uploaded, users can then invoke a Async-API function call with the arguments similar to how the user would use REST API functions in the workflow. The Workflow Hub would parse the input arguments and populate the message according to the format, and then send the message over the chosen protocol.
Supported Capabilities
- Schema Supported is Async API 2.1.0.
- Workflow Hub supports messages over Kafka Interface only. Other interfaces such as AQMP, RabbitMQ, etc are not supported.
- The primary goal of this feature is to support TCSA integration with TCA. The message format that has been used in our testing is the TCSA message format.
- Messages can be published. Messages cannot be consumed by Workflow Hub
- Primitives and Objects are supported in the schema. However, JSON arrays are not.
Usage
Upload Async API Schema Workflow Hub
The user must upload the async-api to Workflow Hub so that the schema content can be made available to the workflows. This is done using the Schema tab in the Workflow Hub UI.
| Async API Schema for Kafka Integration |
|---|
asyncapi: 2.1.0
id: urn:com:vmware:sebu:tca:wfh:tcsaintegration
info:
title: TCSA Integration Message
version: 1.0.0
description: |
This file describes the API through which WFH would report the status of workflows to TCSA.
termsOfService: https://www.vmware.com/content/dam/digitalmarketing/vmware/en/pdf/product/vmware-product-guide.pdf
license:
name: Vmware License
url: https://www.vmware.com
contact:
email: [email protected]
name: VMware
url: https://www.vmware.com
servers:
local-kafka-plain:
url: 'kafka_plain:29092'
protocol: kafka
protocolVersion: '7.4.0-ccs'
description: This kafka broker is unencrypted and has no authentication
local-kafka-sasl-plain:
url: 'kafka_secure:29092'
protocol: kafka
protocolVersion: '7.4.0-ccs'
description: This kafka broker is unencrypted but uses SASL authentication to authenticate producers/consumers
bindings:
sasl_username: admin
sasl_password: YWRtaW4tc2VjcmV0
sasl_mechanism: PLAIN
security_protocol: SASL_PLAINTEXT
defaultContentType: application/json
channels:
tcsastatuschannel:
bindings:
kafka:
topic: tca-stages
description: This channel is used by WFH to report the progress of the workflow
publish:
operationId: sendupdatetotcsa
summary: Sends update to TCSA
description: workflows can use this schema to send updates to TCSA about the progress of WF execution
message:
payload:
type: object
properties:
site_id:
type: string
fsm_state:
type: string
fsm_status:
type: string
pattern: '^(Started|Ended|Failed)$'
error_msg:
type: string
pattern: '^(None|AIR|CAAS|CNF|ZTP).+$'
timestamp:
type: string
additional_data:
$ref: "#/components/schemas/additionaldata"
required:
- site_id
- fsm_state
- fsm_status
- error_msg
- timestamp
components:
schemas:
additionaldata:
type: object
properties:
task:
type: string
task_status:
type: string
event:
type: string
runId:
type: string
clustername:
type: string
dashboard:
$ref: "#/components/schemas/dashboard"
logs:
type: string
dashboard:
type: object
properties:
aoi:
type: string
csr_server_type:
type: string
dag_run:
type: string
du_vendor:
type: string
market:
type: string
region:
type: string
server_model:
type: string
siteId:
type: string
securitySchemes:
kafka_security:
type: plain |
A detailed explanation of the schema is outside the scope of this document. User can refer to Async-API standard. The main sections are covered below
- Servers:
- In the Section servers, the details of the end-points for communication is provided. The same schema can be used to send messages to multiple Kafka server. The authentication details of the Kafka server is also provided along with the server details (Ex: see server local-kafka-sasl-plain). Other server related parameters can be passed under the bindings, and they need to correspond to librdKafka configuration parameters. The configuration parameters exposed in librdkafka uses the "." between words of the variable (Ex: "builtin.features" , "client.id", etc.). In the async-api schema, the "." needs to be replaced by "_".
- Note that the Server urls need to be modified to suit the customer environment. The Kafka server must be up and available.
- Channels: Channels describe the communication protocol between the peers. While multiple channels can be defined each with their own message format, the use case (i.e., TCSA integration) uses a single message format. Consequently, a single channel is defined in the message. A channel can have 2 operations "publish" and "subscribe". Publish allows the workflow to publish events, while subscribe allows the user to receive events and act on it in the workflow. As of WFH 3.0 only publish is supported.
- Topic: Topics can be defined in the channel bindings as mentioned above. If the topic is NOT defined, then the operationId is used as the topic name. The topic must be pre-created on the Kafka brokers to receive the message.
- OperationId: A operation must be configured with an operationId (in the example, the value of this operationId is sendupdatetotcsa). The workflow would refer to the operation to be performed using the operationId.
- message: The message→payload describes the schema of the message format. The messages can contain primitives and objects. The Workflow Hub can recursively work through the objects and generate the message according to the chosen schema. A message field cannot be a json array as this is unsupported in the workflow hub. Similar to Openapi, some fields can be required and some can be optional. Similar to OpenAPI, an object can refer to component/schema. Any field that is of type "string" can be checked to fit a specific pattern using regex matching. The parameter "pattern" must be a regex expression. For example, in the message format above, the field error_msg contains an error code. As this is a predefined error code, it starts with "AIR", "CNF", "ZTP" or when the workflow is successful "None". The async-api checks to see if the received arguments match the pattern described.
Create Sample Workflow
| Async API Demo Workflow |
|---|
id: async-event-demo
name: Kafka Publish
version: 0.1.0
description: Workflow to demonstrate the Event Publishing to Kafka. This workflow can be integrated with other workflows to send events about status of workflows
specVersion: 0.7.0
start: Start_workflow
functions:
- name: PublishWorkflowStatus
operation: db://async-api/2.1.0/kafka/publish/demo/tcsa.json#sendupdatetotcsa
type: asyncapi
metadata:
tlsVerify: false
states:
- name: Start_workflow
type: operation
actions:
- functionRef:
refName: PublishWorkflowStatus
arguments:
site_id: "${ .start.site_id }"
fsm_state: "${ .start.fsm_state }"
fsm_status: "${ .start.fsm_status }"
error_msg: "${ .start.error_msg }"
timestamp: "${ .start.timestamp }"
additional_data: "${ .start.additional_data }"
Content-Type: application/json
transition: Sleep1
- name: Sleep1
type: sleep
duration: PT1S
transition: End_workflow
- name: End_workflow
type: operation
actions:
- functionRef:
refName: PublishWorkflowStatus
arguments:
site_id: "${ .end.site_id }"
fsm_state: "${ .end.fsm_state }"
fsm_status: "${ .end.fsm_status }"
error_msg: "${ .end.error_msg }"
timestamp: "${ .end.timestamp }"
additional_data: "${ .end.additional_data }"
Content-Type: application/json
end: true |
The demo workflow has two states.
- Start Workflow: Start workflow sends the message to the Kafka at the beginning of the workflow. This will indicate to the TCSA the progress of cell site automation (in case of composite workflows used for E2E automation).
- End Workflow: End workflow sends the message to Kafka at the end of the workflow. This will indicate to TCSA that the workflow has completed execution.
This sample workflow would be available as part of the pre-built workflows to provide an example for TCSA Integration.
Workflow Execution
| Input.Json |
|---|
{
"start": {
"site_id": "esx-192-168-146-69",
"fsm_state": "S2",
"fsm_status": "Started",
"error_msg": "AIR142027",
"additional_data": {
"task": "PushTemplateToESXiHost",
"task_status": "Ongoing",
"event": "Started",
"dagrunid": "a69esx-192-168-146-692022-11-21 08:57:40.423437esx-192-168-146-692022-11-21 09:03:41.454028",
"clustername": "None",
"dashboard": {
"aoi": "RIV",
"csr_server_type": "LIT",
"dag_run": "gharidas_auto_26051",
"du_vendor": "Samsung",
"market": "LAB",
"region": "Central",
"server_model": "Dell XR11",
"siteId": "esx-192-168-146-69"
}
}
},
"end": {
"site_id": "esx-192-168-146-69",
"fsm_state": "S2",
"fsm_status": "Failed",
"error_msg": "AIR142027",
"additional_data": {
"task": "PushTemplateToESXiHost",
"task_status": "Failed",
"event": "ended",
"dagrunid": "a69esx-192-168-146-692022-11-21 08:57:40.423437esx-192-168-146-692022-11-21 09:03:41.454028",
"clustername": "None",
"dashboard": {
"aoi": "RIV",
"csr_server_type": "LIT",
"dag_run": "gharidas_auto_26051",
"du_vendor": "Samsung",
"market": "LAB",
"region": "Central",
"server_model": "Dell XR11",
"siteId": "esx-192-168-146-69"
},
"logs": "Unable to connect to url. Please cross check the url and expiry time if any HTTP Error 400: Bad Request"
}
}
} |
In a production workflow where TCSA is to be integrated, some of the parameters such as error_msg, fsm_state, fsm_status, timestamp will be generated from within the workflow. However, other parameters such as dashboard should come from input.
Workflow Failures
There are two types of errors:
- When the arguments passed to the async-api function do not follow the schema requirements, an error is raised and this is passed to the workflow. The workflow fails the execution.
- When the message is properly constructed, there could be communication errors between the Kafka broker and the Workflow Hub. This can result in a message transmission failure. Message transmission failures are logged, but they do not fail the workflow.
DNS Update for TCSA-Integration
TCSA Kafka server name is hard-coded to be kafka-edge in every deployment. Consequently, the server certificate presented by TCSA Kafka is for the host-name kafka-edge. This host, is not routable without a DNS entry that can resolve it to the corresponding IP. Consequently, we need to ensure that the TCA's core-dns module has an entry for kafka-edge. This is a manual step today. It is required whenever we need to integrate with TCSA with secure-kafka.
Templating Support
VMware Telco Cloud Automation 3.1.0 supports templating the AsyncAPIs.
asyncapi: 2.1.0
id: urn:com:vmware:sebu:tca:wfh:tcsaintegration
info:
title: TCSA Integration Message
version: 1.0.0
description: |
This file describes the API through which WFH would report the status of workflows to TCSA.
termsOfService: https://www.vmware.com/content/dam/digitalmarketing/vmware/en/pdf/product/vmware-product-guide.pdf
license:
name: Vmware License
url: https://www.vmware.com
contact:
email: [email protected]
name: VMware
url: https://www.vmware.com
servers:
local-kafka-sasl-secure:
url: '{server}:{port}'
variables:
server:
description: This value is assigned by the service provider, in this example gigantic-server.com
port:
description: This port used for communication
protocol: kafka
protocolVersion: '7.4.0-ccs'
description: This kafka broker is encrypted with TLS and uses SCRAM-512 based auth SASL_SSL protocol
bindings:
kafka:
sasl_username:
type: string
sasl_password:
type: string
sasl_mechanism: 'PLAIN'
security_protocol:
type: string
defaultContentType: application/json
channels:
tcsastatuschannel:
bindings:
kafka:
topic: tca-stages
description: This channel is used by WFH to report the progress of the workflow
publish:
operationId: sendupdatetotcsa
summary: Sends update to TCSA
description: workflows can use this schema to send updates to TCSA about the progress of WF execution
message:
headers:
$ref: "#/components/schemas/headers"
payload:
type: object
properties:
site_id:
type: integer
maximum: 3
minimum: 1
fsm_state:
type: number
maximum: 0.4
minimum: 0.1
required:
- site_id
- fsm_state
components:
schemas:
headers:
type: object
properties:
fsm_status:
type: boolean
default: false
abc:
type: boolean
default: false
error_msg:
type: string
pattern: '^(None|AIR|CAAS|CNF|ZTP).+$'
minimum: 3
maximum: 10
timestamp:
type: string
format: date-time
required:
- fsm_status
- error_msg
- timestamp
additionaldata:
type: object
properties:
task:
type: string
task_status:
type: string
event:
type: string
runId:
type: string
clustername:
type: string
dashboard:
$ref: "#/components/schemas/dashboard"
logs:
type: string
dashboard:
type: object
properties:
aio:
type: string
csr_server_type:
type: string
dag_run:
type: string
du_vendor:
type: string
market:
type: string
region:
type: string
server_model:
type: string
siteId:
type: string
securitySchemes:
kafka_security:
type: plain |
id: templatedemo
name: templatedemo
version: 0.1.0
description: >-
Workflow to demonstrate the Event Publishing to Kafka using an async api template.
specVersion: 0.7.0
start: Start_workflow
functions:
- name: PublishWorkflowStatus
operation: 'db://async-api/2.1.0/template/demo/tcsa.json#sendupdatetotcsa'
type: asyncapi
metadata:
tlsVerify: false
states:
- name: Start_workflow
type: operation
actions:
- functionRef:
refName: PublishWorkflowStatus
arguments:
site_id: '${ .start.site_id }'
fsm_state: '${ .start.fsm_state }'
fsm_status: '${ .start.fsm_status }'
error_msg: '${ .start.error_msg }'
timestamp: '${ now | strftime("%Y-%m-%d %H:%M:%S.%f")}'
additional_data: '${ .start.additional_data }'
server: '${ .start.server }'
port: '${ .start.port }'
Content-Type: application/json
stateDataFilter:
output: '${ .timestamp1 = (now | strftime("%Y-%m-%d %H:%M:%S.%f")) }'
transition: Sleep1
- name: Sleep1
type: sleep
duration: PT1S
transition: End_workflow
- name: End_workflow
type: operation
actions:
- functionRef:
refName: PublishWorkflowStatus
arguments:
site_id: '${ .end.site_id }'
fsm_state: '${ .end.fsm_state }'
fsm_status: '${ .end.fsm_status }'
error_msg: '${ .end.error_msg }'
timestamp: '${ now | strftime("%Y-%m-%d %H:%M:%S.%f")}'
additional_data: '${ .end.additional_data }'
server: '${ .end.server }'
port: '${ .start.port }'
Content-Type: application/json
stateDataFilter:
output: '${ .timestamp2 = (now | strftime("%Y-%m-%d %H:%M:%S.%f")) }'
end: true |
{
"start": {
"site_id": "esx-192-168-146-69",
"fsm_state": "S2",
"fsm_status": "Started",
"server": "10.220.226.116",
"port": "9092",
"server": "localhost",
"sasl_username": "admin",
"sasl_password": "YWRtaW4tc2VjcmV0",
"security_protocol": "SASL_PLAINTEXT",
"error_msg": "AIR7891",
"additional_data": {
"task": "PushTemplateToESXiHost",
"task_status": "Ongoing",
"event": "Started",
"dagrunid": "a69esx-192-168-146-692022-11-21 08:57:40.423437esx-192-168-146-692022-11-21 09:03:41.454028",
"clustername": "None",
"dashboard": {
"aoi": "RIV",
"csr_server_type": "LIT",
"dag_run": "gharidas_auto_26051",
"du_vendor": "Samsung",
"market": "LAB",
"region": "Central",
"server_model": "Dell XR11",
"siteId": "esx-192-168-146-69"
}
}
},
"end": {
"site_id": "esx-192-168-146-69",
"fsm_state": "S2",
"fsm_status": "Failed",
"server": "10.220.226.116",
"port": "9092",
"error_msg": "AIR7891",
"additional_data": {
"task": "PushTemplateToESXiHost",
"task_status": "Failed",
"event": "ended",
"dagrunid": "a69esx-192-168-146-692022-11-21 08:57:40.423437esx-192-168-146-692022-11-21 09:03:41.454028",
"clustername": "None",
"dashboard": {
"aoi": "RIV",
"csr_server_type": "LIT",
"dag_run": "gharidas_auto_26051",
"du_vendor": "Samsung",
"market": "LAB",
"region": "Central",
"server_model": "Dell XR11",
"siteId": "esx-192-168-146-69"
},
"logs": "Unable to connect to url. Please cross check the url and expiry time if any HTTP Error 400: Bad Request"
}
}
} |