The Complete custom kafka collector process with sample code.
1. Collect – collects data from source end points.
2. Consume – consumes data from kafka.
3. Transform – transforms collected data to metric.
Internally, the transformed data publishes to Kafka bus for further processing.
from vmware.tcsa.collector_sdk.collectors.stream_collector import StreamCollectorclass KafkaCollector(StreamCollector): def __init__(self, logger, config) -> None: self._config = config super(StreamCollector, self).__init__(logger, config) def collect(self): workers = [] num_workers = self._config.get_num_workers while True: num_alive = len([w for w in workers if w.is_alive()]) if num_workers == num_alive: continue for _ in range(num_workers - num_alive): p = Process(target=self._consume, daemon=True, args=()) p.start() workers.append(p) self._logger.info('Starting worker #%s', p.pid)def _consume(self): # write logic to consume data from kafka end point def _transform(self): # transform the collected data to metric
The Complete example code for Rest and Kafka Collector can be found in /examples directory