The Complete custom kafka collector process with sample code.
1. Collect – collects data from source end points.
2. Consume – consumes data from kafka.
3. Transform – transforms collected data to metric.
Internally, the transformed data publishes to Kafka bus for further processing.
from vmware.tcsa.collector_sdk.collectors.stream_collector import StreamCollectorclass KafkaCollector(StreamCollector):
def __init__(self, logger, config) -> None:
self._config = config
super(StreamCollector, self).__init__(logger, config)
def collect(self):
workers = []
num_workers = self._config.get_num_workers
while True:
num_alive = len([w for w in workers if w.is_alive()])
if num_workers == num_alive:
continue
for _ in range(num_workers - num_alive):
p = Process(target=self._consume, daemon=True, args=()) p.start()
workers.append(p)
self._logger.info('Starting worker #%s', p.pid)def _consume(self):
# write logic to consume data from kafka end point
def _transform(self):
# transform the collected data to metric
The Complete example code for Rest and Kafka Collector can be found in /examples directory