The Complete custom kubernetes events collector process with sample code.

To write custom collector, need to implement Stream Collector and override the definitions provided such as below:

1.    Collect – collect data from any kubernetes cluster using k8s python client provided api's

In this, used below api to get events data from kubernetes namespaces:

res = watch.Watch().stream(api.list_namespace)

2.   Transform – transform collected data into metric i.e. Type of TCOEvent

Later, the transformed data publishes to Kafka bus for further processing

class EventsCollector(StreamCollector):
    def __init__(self, logger, config: dict) -> None:    
        self._config = config    
        super(BatchCollector, self).__init__(logger, config)
    def collect(self):               
        # coonect to k8s server using k8s python client by providing 
          auth configuration
       api = client.CoreV1Api(client.ApiClient(configuration))
       w = watch.Watch()
       res = w.stream(api.list_namespace)
       for event in res:    
           self.collected_data = event   
           self.transform()
    def transform(self):               
        # transform the collected data to TCOEvent
          event = TCOEvent(Source=output_data.get("Source"), 
                  Name=output_data.get("Name"),     
                  InstanceName=output_data.get("InstanceName"), 
                  State=output_data.get("State"),  
                  Severity=output_data.get("Severity"))
          return event