The Complete custom kubernetes metrics collector process with sample code.

To write custom collector, need to implement Batch Collector and override the definitions provided such as below:

1.    Collect – collect data from any kubernetes cluster using k8s python client provided api's

In this, used below api to get metric data from kubernetes cluster nodes:

api.list_cluster_custom_object("metrics.k8s.io", "v1beta1", "nodes")

2.   Transform – transform collected data into metric i.e. Type of TCOMetric

Later, the transformed data publishes to Kafka bus for further processing:

class MetricsCollector(BatchCollector):
    def __init__(self, logger, config: dict) -> None:    
        self._config = config    
        super(BatchCollector, self).__init__(logger, config)
    def collect(self):               
        # coonect to k8s server using k8s python client by providing 
          auth configuration
       api = client.CustomObjectsApi(client.ApiClient(configuration))
       k8s_nodes = api.list_cluster_custom_object("metrics.k8s.io",
                   "v1beta1", "nodes")
       for stats in k8s_nodes['items']:    
           self.collected_data = stats
           return self.collected_data
    def transform(self):               
        # transform the collected data to TCOMetric
          metric = TCOMetric(output_data.get("instance"),   
                       output_data.get("timestamp"), 
                       output_data.get("metrics"),    
                       output_data.get("properties"),              
                       output_data.get("tags"))
          return metric