The Complete custom kafka collector process with sample code.

1.    Collect – collects data from source end points.

2.   Consume – consumes data from kafka.

3.   Transform – transforms collected data to metric.

Internally, the transformed data publishes to Kafka bus for further processing.

from vmware.tcsa.collector_sdk.collectors.stream_collector import StreamCollectorclass KafkaCollector(StreamCollector):    
def __init__(self, logger, config) -> None:        
   self._config = config        
   super(StreamCollector, self).__init__(logger, config)
def collect(self):    
   workers = []    
   num_workers = self._config.get_num_workers    
   while True:        
      num_alive = len([w for w in workers if w.is_alive()])        
      if num_workers == num_alive:            
         continue        
      for _ in range(num_workers - num_alive):            
         p = Process(target=self._consume, daemon=True, args=())                             p.start()            
workers.append(p)            
self._logger.info('Starting worker #%s', p.pid)def      _consume(self):               
# write logic to consume data from kafka end point
def _transform(self):               
   # transform the collected data to metric

The Complete example code for Rest and Kafka Collector can be found in /examples directory