The given example displays the input JSON data and Kafka Mapper containing JSONPath expressions to process the Input JSON data. The final output is corresponding to the input and the Kafka Mapper.
Input | Kafka Mapper | Output |
---|---|---|
{ "network":{ "type":"ipv6", "direction":"inbound-XYZ", "iana_number":41 }, "newnetwork":{ "type":"ip", "direction":"ABC-outbound", "iana_number":6 }, "error":{ "code":"process has exited. inode=0, tcp_state=TIME-WAIT" }, "user":{ "name":"root", "full_name":"root", "id":"0" }, "source":{ "ip":"10.106.125.152", "port":22 }, "destination":{ "ip":"127.0.0.1", "port":2282 }, "service":{ "type":"system" }, "event":{ "duration":17345372, "dataset":"system.socket", "module":"system" }, "tags":[ "vm_i_raw_event" ], "ecs":{ "version":"1.6.0" }, "host":{ "architecture":"x86_64", "os":{ "name":"Red Hat Enterprise Linux Server", "family":"redhat", "platform":"rhel", "version":"7.9 (Maipo)", "codename":"Maipo", "kernel":"3.10.0-1160.6.1.el7.x86_64" }, "containerized":false, "name":"vl-vm-ic762", "id":"d69e0181566b99b60326991cad162e19", "ip":[ "10.247.152.27", "fe80::f816:3eff:fe63:615f" ], "mac":[ "fa:16:3e:63:61:5f" ], "hostname":"vl-vm-ic762" }, "@timestamp":"2021-02-10T00:02:42.705Z", "metricset":{ "period":300000, "name":"socket" }, "agent":{ "name":"vl-vm-ic762", "type":"metricbeat", "ephemeral_id":"64506ba5-3ea9-4a76-a0d9-7b1d369cc807", "id":"705840f2-3674-4c2e-9a70-081042d34ee1", "version":"7.10.0", "hostname":"vl-vm-ic762" }, "@version":"1", "system":{ "socket":{ "remote":{ "ip":"127.0.0.1", "port":34086 }, "local":{ "ip":"127.0.0.1", "port":2282 } } } } |
{ "metricType": "$.agent.type", "instance": "$.agent.name", "properties.entityName": "$.network[?(@.direction =~ /^.*inbound-.*$/i)].direction", "properties.entityType": "$.network[?(@.type == 'ipv6')].type", "processedTimestamp": "$.@timestamp", "metrics": "$.metricset", "type": "KafkaCollector-One", "properties.dataSource": "$.source.ip", "properties.deviceName": "$.newnetwork[?(@.direction =~ /^.*-outbound.*$/i)].direction", "timestamp": "$.@timestamp", "properties.deviceType": "$.newnetwork[?(@.type =~ /^.*ip.*$/i)].type", "tags": "$.tags" } |
{ "instance":"vl-vm-ic762", "metricType":"metricbeat", "timestamp":1612895562705, "processedTimestamp":1612895562705, "type":"KafkaCollector-One", "metrics":{ "period":300000.0 }, "properties":{ "deviceType":"ip", "entityName":"inbound-XYZ", "entityType":"ipv6", "dataSource":"10.106.125.152", "deviceName":"ABC-outbound" }, "tags":{ } } |
- As per the schema defined in #GUID-6A05CFFF-BDC1-4F18-9365-8A12C327D12B, the metricType is an attribute of the VMware Telco Cloud Service Assurance Metric schema. The keys of the Kafka mapper refer to the attributes of the VMware Telco Cloud Service Assurance Metric schema or a property/metric/tag name.
In the first line of the Kafka Mapper:
"metricType": "$.agent.type"
: ThemetricType
refers to attribute of the VMware Telco Cloud Service Assurance Metric Schema and the corresponding value"$.agent.type"
refers to JSONPath expression. During processing, the JSONPath expression is computed based on the input data and the corresponding value retrieved is assigned to the attribute "metricType" in the output.In this case,
$
refers to the root node in the input. From the root node,$.agent
is looked up for. This refers to the agent key within the root node of input JSON. So the current context will be the agent node.From example -
$.agent
, resolves to:{ "name":"vl-vm-ic762", "type":"metricbeat", "ephemeral_id":"64506ba5-3ea9-4a76-a0d9-7b1d369cc807", "id":"705840f2-3674-4c2e-9a70-081042d34ee1", "version":"7.10.0", "hostname":"vl-vm-ic762" }
$.agent.type
resolves to metricbeat and the output contains{ "metricType" : "metricbeat" }
. - Similarly
$.agent.name
, resolves to vl-vm-ic762 and the output contains{ "instance" : "vl-vm-ic762" }
. - properties.entityName: Any key containing a '.' (Dot) and left part of the string matching the string metrics, properties, or tags are broken down. The right part of the string are treated as the attribute name, within the node identified by the left part of the string.
For the key in #GUID-6A05CFFF-BDC1-4F18-9365-8A12C327D12B, entityName is the attribute name within the properties node. The value as per the Kafka Mapper for this attribute is computed by the configured JSONPath expression -
"$.network[?(@.direction =~ /^.*inbound-.*$/i)].direction
.The expression starts with $.network with a JSONPath filter
[?(@.direction =~ /^.*inbound-.*$/i)]
applied. The JSONPath filter ensures only those nodes are selected that match the filter.The filter expression is@.direction =~ /^.*inbound-.*$/i)
, which looks for current node denoted by @ and the attribute value direction. The attribute value in the input data is matched according to the expression which is a regex,/^.*inbound-.*$/
. This looks for inbound within the value string for a match. If match occurs, such nodes are selected. There are more than one node and multiple values. In such cases, multiple values as string are returned. In this example, there is a single node:{ "type":"ipv6", "direction":"inbound-XYZ", "iana_number":41 }
And .direction refers the attribute selection for this node and the overall expression resolves toinbound-XYZ
.Therefore, the overall line:{ "properties.entityName": "$.network[?(@.direction =~ /^.*inbound-.*$/i)].direction" }
Resolves to:{ "properties":{ "entityName":"inbound-XYZ" } }
- properties.entityType: is exactly same as the previous description (3). The only change is
==
operator instead of regex. type attribute value is matched entirely for 'ipv6'.The overall line is:{ "properties.entityType": "$.network[?(@.type == 'ipv6')].type" }
Resolves to:{ "properties":{ "entityType":"ipv6" } }
- processedTimestamp: refers to an attribute in the VMware Telco Cloud Service Assurance Metric Schema indicating when the collector has processed the metric.
$
refers to the root node and@
refers to the current node. The$.@timestamp
and$.timestamp
produce similar results, and the JSONPath expression resolves to:{ "processedTimestamp": 1612895562705 }
Note: The timestamps are resolved as epoch timestamp in the output. This conversion is only for processedTimestamp and timestamp attributes of VMware Telco Cloud Service Assurance Metric. - metrics: The value string in this line is a JSON Path expression, that resolves to a JSON Node instead of a value string. The input line:
{ "metrics": "$.metricset" } would resolve to { "metricset":{ "period":300000, "name":"socket" } }
But one of the key value pair with key string name contains a value socket, which cannot be converted to float, due to which this key value pair is ignored. The output only contains the metrics node in turn containing the key period with value 300000. The output is:{ "metrics":{ "period":300000.0 } }
- type: Refers to a hardcoded value KafkaCollector-One. The output contains the same string value irrespective of the input.
{ "type":"KafkaCollector-One" }
- properties.dataSource: same explanation as (3), input line:
{ "properties.dataSource": "$.source.ip" } would resolve to { "properties":{ "dataSource":"10.106.125.152" } }
- properties.deviceName: same explanation as (3), input line:
{ "properties.deviceName": "$.newnetwork[?(@.direction =~ /^.*-outbound.*$/i)].direction" } would resolve to { "properties":{ "deviceName":"ABC-outbound" } }
- timestamp: same explanation as (5), input line:
{ "timestamp": "$.@timestamp" } would resolve to { "timestamp":1612895562705 }
- properties.deviceType: same explanation as (3), input line:
{ "properties.deviceType": "$.newnetwork[?(@.type =~ /^.*ip.*$/i)].type" } would resolve to { "properties":{ "deviceType":"ip" } }
- tags: same explanation as (6).
The only difference being the value string for each attribute within the value node was matched for a float or a float as string. In tags there would be no such restriction and values would be converted to strings and assigned. Input line:
{ "tags": "$.tags" } would resolve to { tags: {} }