Introduction

Async-api is a API definition standard (similar to OpenAPI) that will enable dynamic eventing mechanisms between peers. Workflow Hub supports using Async-API to send messages between TCA and TCSA over Kafka. Similar to OpenAPI, user would define schemas, but these schemas are related to messaging format instead of REST APIs. The Schema would be developed according to Async-API standards. Such a schema can then be uploaded to Workflow Hub DB using the Open API Tab in the UI.

Once the schema is uploaded, users can then invoke a Async-API function call with the arguments similar to how the user would use REST API functions in the workflow. The Workflow Hub would parse the input arguments and populate the message according to the format, and then send the message over the chosen protocol.

Supported Capabilities

  • Schema Supported is Async API 2.1.0.
  • Workflow Hub supports messages over Kafka Interface only. Other interfaces such as AQMP, RabbitMQ, etc are not supported.
  • The primary goal of this feature is to support TCSA integration with TCA. The message format that has been used in our testing is the TCSA message format.
  • Messages can be published. Messages cannot be consumed by Workflow Hub
  • Primitives and Objects are supported in the schema. However, JSON arrays are not.

Usage

Upload Async API Schema Workflow Hub

The user must upload the async-api to Workflow Hub so that the schema content can be made available to the workflows. This is done using the Schema tab in the Workflow Hub UI.

A sample async-api schema (used for TCSA Integration) is provided below.
Async API Schema for Kafka Integration
asyncapi: 2.1.0
id: urn:com:vmware:sebu:tca:wfh:tcsaintegration
info:
  title: TCSA Integration Message
  version: 1.0.0
  description: |
    This file describes the API through which WFH would report the status of workflows to TCSA.
  termsOfService: https://www.vmware.com/content/dam/digitalmarketing/vmware/en/pdf/product/vmware-product-guide.pdf
  license:
    name: Vmware License
    url: https://www.vmware.com
  contact:
    email: [email protected]
    name: VMware
    url: https://www.vmware.com
 
servers:
  local-kafka-plain:
    url: 'kafka_plain:29092'
    protocol: kafka
    protocolVersion: '7.4.0-ccs'
    description: This kafka broker is unencrypted and has no authentication
  local-kafka-sasl-plain:
    url: 'kafka_secure:29092'
    protocol: kafka
    protocolVersion: '7.4.0-ccs'
    description: This kafka broker is unencrypted but uses SASL authentication to authenticate producers/consumers
    bindings:
      sasl_username: admin
      sasl_password: YWRtaW4tc2VjcmV0
      sasl_mechanism: PLAIN
      security_protocol: SASL_PLAINTEXT
 
defaultContentType: application/json
 
channels:
  tcsastatuschannel:
    bindings:
      kafka:
        topic: tca-stages
    description: This channel is used by WFH to report the progress of the workflow
    publish:
      operationId: sendupdatetotcsa
      summary: Sends update to TCSA
      description: workflows can use this schema to send updates to TCSA about the progress of WF execution
      message:
        payload:
          type: object
          properties:
            site_id:
              type: string
            fsm_state:
              type: string
            fsm_status:
              type: string
              pattern: '^(Started|Ended|Failed)$'
            error_msg:
              type: string
              pattern: '^(None|AIR|CAAS|CNF|ZTP).+$'
            timestamp:
              type: string
            additional_data:
              $ref: "#/components/schemas/additionaldata"
          required:
            - site_id
            - fsm_state
            - fsm_status
            - error_msg
            - timestamp
 
components:
  schemas:
    additionaldata:
      type: object
      properties:
        task:
          type: string
        task_status:
          type: string
        event:
          type: string
        runId:
          type: string
        clustername:
          type: string
        dashboard:
          $ref: "#/components/schemas/dashboard"
        logs:
          type: string
    dashboard:
      type: object
      properties:
        aoi:
          type: string
        csr_server_type:
          type: string
        dag_run:
          type: string
        du_vendor:
          type: string
        market:
          type: string
        region:
          type: string
        server_model:
          type: string
        siteId:
          type: string
  securitySchemes:
    kafka_security:
      type: plain

A detailed explanation of the schema is outside the scope of this document. User can refer to Async-API standard. The main sections are covered below

  • Servers:
    • In the Section servers, the details of the end-points for communication is provided. The same schema can be used to send messages to multiple Kafka server. The authentication details of the Kafka server is also provided along with the server details (Ex: see server local-kafka-sasl-plain). Other server related parameters can be passed under the bindings, and they need to correspond to librdKafka configuration parameters. The configuration parameters exposed in librdkafka uses the "." between words of the variable (Ex: "builtin.features" , "client.id", etc.). In the async-api schema, the "." needs to be replaced by "_".
    • Note that the Server urls need to be modified to suit the customer environment. The Kafka server must be up and available.
  • Channels: Channels describe the communication protocol between the peers. While multiple channels can be defined each with their own message format, the use case (i.e., TCSA integration) uses a single message format. Consequently, a single channel is defined in the message. A channel can have 2 operations "publish" and "subscribe". Publish allows the workflow to publish events, while subscribe allows the user to receive events and act on it in the workflow. As of WFH 3.0 only publish is supported.
  • Topic: Topics can be defined in the channel bindings as mentioned above. If the topic is NOT defined, then the operationId is used as the topic name. The topic must be pre-created on the Kafka brokers to receive the message.
  • OperationId: A operation must be configured with an operationId (in the example, the value of this operationId is sendupdatetotcsa). The workflow would refer to the operation to be performed using the operationId.
  • message: The message→payload describes the schema of the message format. The messages can contain primitives and objects. The Workflow Hub can recursively work through the objects and generate the message according to the chosen schema. A message field cannot be a json array as this is unsupported in the workflow hub. Similar to Openapi, some fields can be required and some can be optional. Similar to OpenAPI, an object can refer to component/schema. Any field that is of type "string" can be checked to fit a specific pattern using regex matching. The parameter "pattern" must be a regex expression. For example, in the message format above, the field error_msg contains an error code. As this is a predefined error code, it starts with "AIR", "CNF", "ZTP" or when the workflow is successful "None". The async-api checks to see if the received arguments match the pattern described.

Create Sample Workflow

A sample workflow demonstrating the event publishing is given below.
Async API Demo Workflow
id: async-event-demo
name: Kafka Publish
version: 0.1.0
description: Workflow to demonstrate the Event Publishing to Kafka. This workflow can be integrated with other workflows to send events about status of workflows
specVersion: 0.7.0
start: Start_workflow
functions:
  - name: PublishWorkflowStatus
    operation: db://async-api/2.1.0/kafka/publish/demo/tcsa.json#sendupdatetotcsa
    type: asyncapi
    metadata:
      tlsVerify: false
 
states:
  - name: Start_workflow
    type: operation
    actions:
      - functionRef:
          refName: PublishWorkflowStatus
          arguments:
            site_id: "${ .start.site_id }"
            fsm_state: "${ .start.fsm_state }"
            fsm_status: "${ .start.fsm_status }"
            error_msg: "${ .start.error_msg }"
            timestamp: "${ .start.timestamp }"
            additional_data: "${ .start.additional_data }"
            Content-Type: application/json
    transition: Sleep1
 
  - name: Sleep1
    type: sleep
    duration: PT1S
    transition: End_workflow
 
  - name: End_workflow
    type: operation
    actions:
      - functionRef:
          refName: PublishWorkflowStatus
          arguments:
            site_id: "${ .end.site_id }"
            fsm_state: "${ .end.fsm_state }"
            fsm_status: "${ .end.fsm_status }"
            error_msg: "${ .end.error_msg }"
            timestamp: "${ .end.timestamp }"
            additional_data: "${ .end.additional_data }"
            Content-Type: application/json
    end: true

The demo workflow has two states.

  • Start Workflow: Start workflow sends the message to the Kafka at the beginning of the workflow. This will indicate to the TCSA the progress of cell site automation (in case of composite workflows used for E2E automation).
  • End Workflow: End workflow sends the message to Kafka at the end of the workflow. This will indicate to TCSA that the workflow has completed execution.

This sample workflow would be available as part of the pre-built workflows to provide an example for TCSA Integration.

Workflow Execution

The workflow can be executed with the following input.
Input.Json
{
  "start": {
    "site_id": "esx-192-168-146-69",
    "fsm_state": "S2",
    "fsm_status": "Started",
    "error_msg": "AIR142027",
    "additional_data": {
      "task": "PushTemplateToESXiHost",
      "task_status": "Ongoing",
      "event": "Started",
      "dagrunid": "a69esx-192-168-146-692022-11-21 08:57:40.423437esx-192-168-146-692022-11-21 09:03:41.454028",
      "clustername": "None",
      "dashboard": {
        "aoi": "RIV",
        "csr_server_type": "LIT",
        "dag_run": "gharidas_auto_26051",
        "du_vendor": "Samsung",
        "market": "LAB",
        "region": "Central",
        "server_model": "Dell XR11",
        "siteId": "esx-192-168-146-69"
      }
    }
  },
  "end": {
    "site_id": "esx-192-168-146-69",
    "fsm_state": "S2",
    "fsm_status": "Failed",
    "error_msg": "AIR142027",
    "additional_data": {
      "task": "PushTemplateToESXiHost",
      "task_status": "Failed",
      "event": "ended",
      "dagrunid": "a69esx-192-168-146-692022-11-21 08:57:40.423437esx-192-168-146-692022-11-21 09:03:41.454028",
      "clustername": "None",
      "dashboard": {
        "aoi": "RIV",
        "csr_server_type": "LIT",
        "dag_run": "gharidas_auto_26051",
        "du_vendor": "Samsung",
        "market": "LAB",
        "region": "Central",
        "server_model": "Dell XR11",
        "siteId": "esx-192-168-146-69"
      },
      "logs": "Unable to connect to url. Please cross check the url and expiry time if any HTTP Error 400: Bad Request"
    }
  }
}

In a production workflow where TCSA is to be integrated, some of the parameters such as error_msg, fsm_state, fsm_status, timestamp will be generated from within the workflow. However, other parameters such as dashboard should come from input.

Workflow Failures

There are two types of errors:

  • When the arguments passed to the async-api function do not follow the schema requirements, an error is raised and this is passed to the workflow. The workflow fails the execution.
  • When the message is properly constructed, there could be communication errors between the Kafka broker and the Workflow Hub. This can result in a message transmission failure. Message transmission failures are logged, but they do not fail the workflow.

DNS Update for TCSA-Integration

TCSA Kafka server name is hard-coded to be kafka-edge in every deployment. Consequently, the server certificate presented by TCSA Kafka is for the host-name kafka-edge. This host, is not routable without a DNS entry that can resolve it to the corresponding IP. Consequently, we need to ensure that the TCA's core-dns module has an entry for kafka-edge. This is a manual step today. It is required whenever we need to integrate with TCSA with secure-kafka.

Templating Support

VMware Telco Cloud Automation 3.1.0 supports templating the AsyncAPIs.

A sample async api is given below:
asyncapi: 2.1.0
id: urn:com:vmware:sebu:tca:wfh:tcsaintegration
info:
  title: TCSA Integration Message
  version: 1.0.0
  description: |
    This file describes the API through which WFH would report the status of workflows to TCSA.
  termsOfService: https://www.vmware.com/content/dam/digitalmarketing/vmware/en/pdf/product/vmware-product-guide.pdf
  license:
    name: Vmware License
    url: https://www.vmware.com
  contact:
    email: [email protected]
    name: VMware
    url: https://www.vmware.com
 
servers:
  local-kafka-sasl-secure:
    url: '{server}:{port}'
    variables:
      server:
        description: This value is assigned by the service provider, in this example gigantic-server.com
      port:
        description: This port used for communication
    protocol: kafka
    protocolVersion: '7.4.0-ccs'
    description: This kafka broker is encrypted with TLS and uses SCRAM-512 based auth SASL_SSL protocol
    bindings:
      kafka:
        sasl_username:
          type: string
        sasl_password:
          type: string
        sasl_mechanism: 'PLAIN'
        security_protocol:
          type: string
 
 
defaultContentType: application/json
 
channels:
  tcsastatuschannel:
    bindings:
      kafka:
        topic: tca-stages
    description: This channel is used by WFH to report the progress of the workflow
    publish:
      operationId: sendupdatetotcsa
      summary: Sends update to TCSA
      description: workflows can use this schema to send updates to TCSA about the progress of WF execution
      message:
        headers:
          $ref: "#/components/schemas/headers"
        payload:
          type: object
          properties:
            site_id:
              type: integer
              maximum: 3
              minimum: 1
            fsm_state:
              type: number
              maximum: 0.4
              minimum: 0.1
          required:
            - site_id
            - fsm_state
 
components:
  schemas:
    headers:
      type: object
      properties:
        fsm_status:
          type: boolean
          default: false
        abc:
          type: boolean
          default: false
        error_msg:
          type: string
          pattern: '^(None|AIR|CAAS|CNF|ZTP).+$'
          minimum: 3
          maximum: 10
        timestamp:
          type: string
          format: date-time
      required:
        - fsm_status
        - error_msg
        - timestamp
    additionaldata:
      type: object
      properties:
        task:
          type: string
        task_status:
          type: string
        event:
          type: string
        runId:
          type: string
        clustername:
          type: string
        dashboard:
          $ref: "#/components/schemas/dashboard"
        logs:
          type: string
    dashboard:
      type: object
      properties:
        aio:
          type: string
        csr_server_type:
          type: string
        dag_run:
          type: string
        du_vendor:
          type: string
        market:
          type: string
        region:
          type: string
        server_model:
          type: string
        siteId:
          type: string
  securitySchemes:
    kafka_security:
      type: plain
A sample workflow demonstrating templating support is given below:
id: templatedemo
name: templatedemo
version: 0.1.0
description: >-
  Workflow to demonstrate the Event Publishing to Kafka using an async api template.
specVersion: 0.7.0
start: Start_workflow
functions:
  - name: PublishWorkflowStatus
    operation: 'db://async-api/2.1.0/template/demo/tcsa.json#sendupdatetotcsa'
    type: asyncapi
    metadata:
      tlsVerify: false
states:
  - name: Start_workflow
    type: operation
    actions:
      - functionRef:
          refName: PublishWorkflowStatus
          arguments:
            site_id: '${ .start.site_id }'
            fsm_state: '${ .start.fsm_state }'
            fsm_status: '${ .start.fsm_status }'
            error_msg: '${ .start.error_msg }'
            timestamp: '${ now | strftime("%Y-%m-%d %H:%M:%S.%f")}'
            additional_data: '${ .start.additional_data }'
            server: '${ .start.server }'
            port: '${ .start.port }'
            Content-Type: application/json
    stateDataFilter:
      output: '${ .timestamp1 = (now | strftime("%Y-%m-%d %H:%M:%S.%f")) }'
    transition: Sleep1
  - name: Sleep1
    type: sleep
    duration: PT1S
    transition: End_workflow
  - name: End_workflow
    type: operation
    actions:
      - functionRef:
          refName: PublishWorkflowStatus
          arguments:
            site_id: '${ .end.site_id }'
            fsm_state: '${ .end.fsm_state }'
            fsm_status: '${ .end.fsm_status }'
            error_msg: '${ .end.error_msg }'
            timestamp: '${ now | strftime("%Y-%m-%d %H:%M:%S.%f")}'
            additional_data: '${ .end.additional_data }'
            server: '${ .end.server }'
            port: '${ .start.port }'
            Content-Type: application/json
    stateDataFilter:
      output: '${ .timestamp2 = (now | strftime("%Y-%m-%d %H:%M:%S.%f")) }'
    end: true
A sample input is provided below:
{
    "start": {
        "site_id": "esx-192-168-146-69",
        "fsm_state": "S2",
        "fsm_status": "Started",
        "server": "10.220.226.116",
        "port": "9092",
        "server":            "localhost",
        "sasl_username":     "admin",
        "sasl_password":     "YWRtaW4tc2VjcmV0",
        "security_protocol": "SASL_PLAINTEXT",        
        "error_msg": "AIR7891",
        "additional_data": {
            "task": "PushTemplateToESXiHost",
            "task_status": "Ongoing",
            "event": "Started",
            "dagrunid": "a69esx-192-168-146-692022-11-21 08:57:40.423437esx-192-168-146-692022-11-21 09:03:41.454028",
            "clustername": "None",
            "dashboard": {
                "aoi": "RIV",
                "csr_server_type": "LIT",
                "dag_run": "gharidas_auto_26051",
                "du_vendor": "Samsung",
                "market": "LAB",
                "region": "Central",
                "server_model": "Dell XR11",
                "siteId": "esx-192-168-146-69"
            }
        }
    },
    "end": {
        "site_id": "esx-192-168-146-69",
        "fsm_state": "S2",
        "fsm_status": "Failed",
        "server": "10.220.226.116",
        "port": "9092",
        "error_msg": "AIR7891",
        "additional_data": {
            "task": "PushTemplateToESXiHost",
            "task_status": "Failed",
            "event": "ended",
            "dagrunid": "a69esx-192-168-146-692022-11-21 08:57:40.423437esx-192-168-146-692022-11-21 09:03:41.454028",
            "clustername": "None",
            "dashboard": {
                "aoi": "RIV",
                "csr_server_type": "LIT",
                "dag_run": "gharidas_auto_26051",
                "du_vendor": "Samsung",
                "market": "LAB",
                "region": "Central",
                "server_model": "Dell XR11",
                "siteId": "esx-192-168-146-69"
            },
            "logs": "Unable to connect to url. Please cross check the url and expiry time if any HTTP Error 400: Bad Request"
        }
    }
}