Kafka Mapper is one more option available in VMware Telco Cloud Service Assurance that helps you to bring data into the system for further analysis.
It connects to the source Kafka broker and tap data from a given topic, extract and transform data into the VMware Telco Cloud Service Assurance object model, and then store it in VMware Telco Cloud Service Assurance
- Consuming JSON data from the customer Kafka topic.
- Converting data to VMware Telco Cloud Service Assurance object model.
- Processing data based on user-defined mapping to VMware Telco Cloud Service Assurance defined JSON format.
- Performing validation and send processed records to the internal VMware Telco Cloud Service Assurance Kafka for further consumption (For example: Reporting).
Mapping
In order to comply with VMware Telco Cloud Service Assurance data model, you must inform a mapping that will be used to process incoming records. Properties can be mapped using JSON Path expressions.
Operators
Operators | Description |
---|---|
== | left is equal to right (note that 1 is not equal to '1') |
!= | left is not equal to right |
< | left is less than right |
<= | left is less or equal to right |
> | left is greater than right |
>= | left is greater than or equal to right |
=~ | left matches regular expression [?(@.name =~ /foo.*?/i)] |
in | left exists in right [?(@.size in ['S', 'M'])] |
nin | left does not exists in right |
subsetof | left is a subset of right [?(@.sizes subsetof ['S', 'M', 'L'])] |
anyof | left has an intersection with right [?(@.sizes anyof ['M', 'L'])] |
noneof | left has no intersection with right [?(@.sizes noneof ['M', 'L'])] |
size | size of left (array or string) should match right |
empty | left (array or string) should be empty |
{ "network":{ "type":"ipv6", "direction":"inbound-XYZ", "iana_number":41 }, "error":{ "code":"process has exited. inode=0, tcp_state=TIME-WAIT" }, "user":{ "name":"root", "full_name":"root", "id":"0" }, "destination":{ "ip":"127.0.0.1", "port":2282 }, "service":{ "type":"system" }, "event":{ "duration":17345372, "dataset":"system.socket", "module":"system" }, "tags":[ "beats_input_raw_event" ], "ecs":{ "version":"1.6.0" }, "host":{ "architecture":"x86_64", "os":{ "name":"Red Hat Enterprise Linux Server", "family":"redhat", "platform":"rhel", "version":"7.9 (Maipo)", "codename":"Maipo", "kernel":"3.10.0-1160.6.1.el7.x86_64" }, "containerized":false, "name":"vl-vm-ic762", "id":"d69e0181566b99b60326991cad162e19", "ip":[ "10.247.152.27", "fe80::f816:3eff:fe63:615f" ], "mac":[ "fa:16:3e:63:61:5f" ], "hostname":"vl-vm-ic762" }, "@timestamp":"2021-02-10T00:02:42.705Z", "source":{ "ip":"127.0.0.1", "port":34086 }, "metricset":{ "period":300000, "name":"socket" }, "agent":{ "name":"vl-vm-ic762", "type":"metricbeat", "ephemeral_id":"64506ba5-3ea9-4a76-a0d9-7b1d369cc807", "id":"705840f2-3674-4c2e-9a70-081042d34ee1", "version":"7.10.0", "hostname":"vl-vm-ic762" }, "@version":"1", "system":{ "socket":{ "remote":{ "ip":"127.0.0.1", "port":34086 }, "local":{ "ip":"127.0.0.1", "port":2282 } } } }
Expressions
- Contains:
$.network[?(@.type =~ /^.*ip.*$/i)]
The expression looks for the element "type" under "network" that contains the "
ip
" string. - Match on prefix:
$.network[?(@.direction =~ /^.*inbound-.*$/i)
The expression looks for the element "direction" under "network" that contains the prefix "
inbound-
" string. - Match on suffix:
$.network[?(@.direction =~ /^.*-outbound.*$/i)
The expression looks for the element "direction" under "network" that contains the suffix "
-outbound
" string. - Match on numeric value:
$.network[?(@.iana_number > 10 )
The expression looks for the element "iana_number" under "network" that contains values greater than "10".
- Match on string value:
$.network[?(@.type == 'ipv6')
The expression looks for the element "type" under "network" that contains exactly the "ipv6" string.
- Match on value and suffix regex:
$.network[?(@.iana_number < 10 || @.direction =~ /^.*-outbound.*$/i)
The expression looks for the element "iana_number" smaller than "10" and for the element "direction" that contains the suffix "
-outbound
" string, both under "network".
Functions
You can enter the functions at the tail end of a path - the input to a function is the output of the path expression. The function output transcripts the function itself to use the function place after the path and argument, to pass the functions. For example: $.store.book.length().
Function | Description | Return type |
---|---|---|
min() | Provide the min value of an array of numbers. | Double |
max() | Provide the max value of an array of numbers. | Double |
avg() | Provide the average value of an array of numbers. | Double |
stddev() | Provide the standard deviation value of an array of numbers. | Double |
length() | Provide the length of an array. | Integer |
sum() | Provide the sum value of an array of numbers. | Double |
keys() | Provide the property keys (An alternative for terminal tilde ~ ). |
Set<E> |
concat(X) | Provide a concatenated version of the path output with a new item. | Same as input |
append(X) | Add an item to the json path output array. | Same as input |
Custom Functions
A custom function for date transform is present in the Kafka Mapper, which takes a String input parameter which denotes the source date format use of the function $.timestamp.transform_date(\"yyyy-MM-dd HH:mm\").
If the date format is present in the JSON data itself, then use $.timestamp.transform_date($.dateFormat)
Format | Example |
---|---|
yyyy-MM-dd*HH:mm:ss | 2022-07-04*13:23:55 |
yyyy-MM-dd*HH:mm:ss:SSS | 2022-10-30*02:47:33:899 |
yy-MM-dd HH:mm:ss SSS | 22-06-26 02:31:29 573 |
yyyy MMM dd HH:mm:ss.SSS*zzz | 2018 Apr 13 22:08:13.211*PDT |