The Complete custom kubernetes topology collector process with sample code.

To write custom collector, need to implement Batch Collector and override the definitions provided such as below:

1.    Collect – collect data from any kubernetes cluster using k8s python client provided api's

In this, used below api to get topology data from kubernetes cluster pods and nodes:

api.list_pod_for_all_namespaces(watch=False, field_selector=field_selector)

2.   Transform – transform collected data into topology objects i.e. Type of KubernetesPod & KubernetesWorker

Later, the transformed data publishes to Kafka bus for further processing

class TopologyCollector(BatchCollector):
    def __init__(self, logger, config: dict) -> None:    
        self._config = config    
        super(BatchCollector, self).__init__(logger, config)
    def collect(self):               
        # coonect to k8s server using k8s python client by providing 
          auth configuration
       api =client.CoreV1Api(client.ApiClient(configuration))
       res = api.list_node()
       for node in res.items:  
          field_selector = 'spec.nodeName=' + node.metadata.name  
          ret = api.list_pod_for_all_namespaces(watch=False,  
                field_selector=field_selector)   
          for pod in ret.items:    
              self.collected_data = pod    
              self.transform()
    def transform(self):               
        # transform the collected data to TCOMetric
          pod = KubernetesPod(discoveryID=discoveryID, timestamp=timestamp,
                name=name, type="KubernetesPod",
                domain=domain, Source=source, jobID=jobID,
                groupName=groupName, action=action)
          node = KubernetesWorker(discoveryID=discoveryID, 
                 timestamp=timestamp, name=name,
                 type="KubernetesWorker", domain=domain, Source=Source, 
                 jobID=jobID, groupName=groupName, action=action,
                 properties=properties, relations=relations)
          node.add_Contains(pod)
          pod.add_ContainedBy(node)
          self.publish(pod)
          return pod