Kafka Mapper is one more option available in VMware Telco Cloud Service Assurance that helps you to bring data into the system for further analysis.

It connects to the source Kafka broker and tap data from a given topic, extract and transform data into the VMware Telco Cloud Service Assurance object model, and then store it in VMware Telco Cloud Service Assurance

The Kafka Mapper approach consists in:
  1. Consuming JSON data from the customer Kafka topic.
  2. Converting data to VMware Telco Cloud Service Assurance object model.
  3. Processing data based on user-defined mapping to VMware Telco Cloud Service Assurance defined JSON format.
  4. Performing validation and send processed records to the internal VMware Telco Cloud Service Assurance Kafka for further consumption (For example: Reporting).

Mapping

In order to comply with VMware Telco Cloud Service Assurance data model, you must inform a mapping that will be used to process incoming records. Properties can be mapped using JSON Path expressions.

Operators

This is a list of supported operators:
Operators Description
== left is equal to right (note that 1 is not equal to '1')
!= left is not equal to right
< left is less than right
<= left is less or equal to right
> left is greater than right
>= left is greater than or equal to right
=~ left matches regular expression [?(@.name =~ /foo.*?/i)]
in left exists in right [?(@.size in ['S', 'M'])]
nin left does not exists in right
subsetof left is a subset of right [?(@.sizes subsetof ['S', 'M', 'L'])]
anyof left has an intersection with right [?(@.sizes anyof ['M', 'L'])]
noneof left has no intersection with right [?(@.sizes noneof ['M', 'L'])]
size size of left (array or string) should match right
empty left (array or string) should be empty
The following JSON payload is used to illustrate some of the capabilities of the mapping:
{
   "network":{
      "type":"ipv6",
      "direction":"inbound-XYZ",
      "iana_number":41
   },
   "error":{
      "code":"process has exited. inode=0, tcp_state=TIME-WAIT"
   },
   "user":{
      "name":"root",
      "full_name":"root",
      "id":"0"
   },
   "destination":{
      "ip":"127.0.0.1",
      "port":2282
   },
   "service":{
      "type":"system"
   },
   "event":{
      "duration":17345372,
      "dataset":"system.socket",
      "module":"system"
   },
   "tags":[
      "beats_input_raw_event"
   ],
   "ecs":{
      "version":"1.6.0"
   },
   "host":{
      "architecture":"x86_64",
      "os":{
         "name":"Red Hat Enterprise Linux Server",
         "family":"redhat",
         "platform":"rhel",
         "version":"7.9 (Maipo)",
         "codename":"Maipo",
         "kernel":"3.10.0-1160.6.1.el7.x86_64"
      },
      "containerized":false,
      "name":"vl-vm-ic762",
      "id":"d69e0181566b99b60326991cad162e19",
      "ip":[
         "10.247.152.27",
         "fe80::f816:3eff:fe63:615f"
      ],
      "mac":[
         "fa:16:3e:63:61:5f"
      ],
      "hostname":"vl-vm-ic762"
   },
   "@timestamp":"2021-02-10T00:02:42.705Z",
   "source":{
      "ip":"127.0.0.1",
      "port":34086
   },
   "metricset":{
      "period":300000,
      "name":"socket"
   },
   "agent":{
      "name":"vl-vm-ic762",
      "type":"metricbeat",
      "ephemeral_id":"64506ba5-3ea9-4a76-a0d9-7b1d369cc807",
      "id":"705840f2-3674-4c2e-9a70-081042d34ee1",
      "version":"7.10.0",
      "hostname":"vl-vm-ic762"
   },
   "@version":"1",
   "system":{
      "socket":{
         "remote":{
            "ip":"127.0.0.1",
            "port":34086
         },
         "local":{
            "ip":"127.0.0.1",
            "port":2282
         }
      }
   }
}

Expressions

List of possible expressions used along with the previously mentioned operators:
  1. Contains:
    $.network[?(@.type =~ /^.*ip.*$/i)]

    The expression looks for the element "type" under "network" that contains the "ip" string.

  2. Match on prefix:
    $.network[?(@.direction =~ /^.*inbound-.*$/i)

    The expression looks for the element "direction" under "network" that contains the prefix "inbound-" string.

  3. Match on suffix:
    $.network[?(@.direction =~ /^.*-outbound.*$/i)

    The expression looks for the element "direction" under "network" that contains the suffix "-outbound" string.

  4. Match on numeric value:
    $.network[?(@.iana_number > 10 )

    The expression looks for the element "iana_number" under "network" that contains values greater than "10".

  5. Match on string value:
    $.network[?(@.type == 'ipv6')

    The expression looks for the element "type" under "network" that contains exactly the "ipv6" string.

  6. Match on value and suffix regex:
    $.network[?(@.iana_number < 10 || @.direction =~ /^.*-outbound.*$/i)

    The expression looks for the element "iana_number" smaller than "10" and for the element "direction" that contains the suffix "-outbound" string, both under "network".

Note: For more information, refer to the JSONPath documentation. You can evaluate the expressions using the Jayway JsonPath Evaluator open source tool.

Functions

You can enter the functions at the tail end of a path - the input to a function is the output of the path expression. The function output transcripts the function itself to use the function place after the path and argument, to pass the functions. For example: $.store.book.length().

Following table contains all the functions supported by the JSON Path library:
Function Description Return type
min() Provide the min value of an array of numbers. Double
max() Provide the max value of an array of numbers. Double
avg() Provide the average value of an array of numbers. Double
stddev() Provide the standard deviation value of an array of numbers. Double
length() Provide the length of an array. Integer
sum() Provide the sum value of an array of numbers. Double
keys() Provide the property keys (An alternative for terminal tilde ~). Set<E>
concat(X) Provide a concatenated version of the path output with a new item. Same as input
append(X) Add an item to the json path output array. Same as input

Custom Functions

A custom function for date transform is present in the Kafka Mapper, which takes a String input parameter which denotes the source date format use of the function $.timestamp.transform_date(\"yyyy-MM-dd HH:mm\").

If the date format is present in the JSON data itself, then use $.timestamp.transform_date($.dateFormat)

For example, some of the supported date time formats:
Format Example
yyyy-MM-dd*HH:mm:ss 2022-07-04*13:23:55
yyyy-MM-dd*HH:mm:ss:SSS 2022-10-30*02:47:33:899
yy-MM-dd HH:mm:ss SSS 22-06-26 02:31:29 573
yyyy MMM dd HH:mm:ss.SSS*zzz 2018 Apr 13 22:08:13.211*PDT
Note: Other supported and pre-defined date time formats are present in the Java 11 date time library.