The Complete custom kubernetes topology collector process with sample code.
To write custom collector, need to implement Batch Collector and override the definitions provided such as below:
1. Collect – collect data from any kubernetes cluster using k8s python client provided api's
In this, used below api to get topology data from kubernetes cluster pods and nodes:
api.list_pod_for_all_namespaces(watch=False, field_selector=field_selector)
2. Transform – transform collected data into topology objects i.e. Type of KubernetesPod & KubernetesWorker
Later, the transformed data publishes to Kafka bus for further processing
class TopologyCollector(BatchCollector): def __init__(self, logger, config: dict) -> None: self._config = config super(BatchCollector, self).__init__(logger, config) def collect(self): # coonect to k8s server using k8s python client by providing auth configuration api =client.CoreV1Api(client.ApiClient(configuration)) res = api.list_node() for node in res.items: field_selector = 'spec.nodeName=' + node.metadata.name ret = api.list_pod_for_all_namespaces(watch=False, field_selector=field_selector) for pod in ret.items: self.collected_data = pod self.transform() def transform(self): # transform the collected data to TCOMetric pod = KubernetesPod(discoveryID=discoveryID, timestamp=timestamp, name=name, type="KubernetesPod", domain=domain, Source=source, jobID=jobID, groupName=groupName, action=action) node = KubernetesWorker(discoveryID=discoveryID, timestamp=timestamp, name=name, type="KubernetesWorker", domain=domain, Source=Source, jobID=jobID, groupName=groupName, action=action, properties=properties, relations=relations) node.add_Contains(pod) pod.add_ContainedBy(node) self.publish(pod) return pod