The Complete custom kubernetes events collector process with sample code.
To write custom collector, need to implement Stream Collector and override the definitions provided such as below:
1. Collect – collect data from any kubernetes cluster using k8s python client provided api's
In this, used below api to get events data from kubernetes namespaces:
res = watch.Watch().stream(api.list_namespace)
2. Transform – transform collected data into metric i.e. Type of TCOEvent
Later, the transformed data publishes to Kafka bus for further processing
class EventsCollector(StreamCollector): def __init__(self, logger, config: dict) -> None: self._config = config super(BatchCollector, self).__init__(logger, config) def collect(self): # coonect to k8s server using k8s python client by providing auth configuration api = client.CoreV1Api(client.ApiClient(configuration)) w = watch.Watch() res = w.stream(api.list_namespace) for event in res: self.collected_data = event self.transform() def transform(self): # transform the collected data to TCOEvent event = TCOEvent(Source=output_data.get("Source"), Name=output_data.get("Name"), InstanceName=output_data.get("InstanceName"), State=output_data.get("State"), Severity=output_data.get("Severity")) return event