Complete Example for Kafka custom Collector.

class MultiThreadedKafkaCollector(StreamCollector):    

   def __init__(self, logger, config) -> None:        
      self._config = config        
      self.event = multiprocessing.Event()        
      self.retries = 0        
      super(StreamCollector, self).__init__(logger, config)     
  def invoke(self, command: chr):        
    self.collect()   
  def collect(self):        
     workers = []        
     num_workers = self._config.get_num_workers        
     while True:            
     try:                
       if self.event.is_set():                    
       print("Exiting all child process..")                    
         for i in workers:                        
            i.terminate()                    
            sys.exit(1)                
       num_alive = len([w for w in workers if w.is_alive()])                
       if num_workers == num_alive:                    
          continue                
       for _ in range(num_workers - num_alive):                    
         p = Process(target=self._consume, daemon=True, args=()) 
         p.start()                    
         workers.append(p)                     
    except Exception as e:                
       print("Exception in process ", e)    

  def _consume(self):        
     consumer_config = {} // set configuration    
     num_threads = self._config.get_num_threads          
     topics = self._config.get_topic_name            
     consumer = Consumer(consumer_config)        
     consumer.subscribe([topics])        
     msg_queue = Queue(maxsize=num_threads)        
     while True:            
       try:                
          msg = consumer.poll(60)                
          if msg is None:                    
            continue                
          if msg.error():                     
              raise KafkaException(msg.error())                
          msg_queue.put(msg)                        
          t = threading.Thread(target=self._transform_message, args=. 
          (msg_queue, consumer))                
          t.start()            
      except Exception as e:                 
        //retry logic
   def _transform_message(self, msg_queue, consumer) -> TCOBase:             
     msg = msg_queue.get(timeout=60)  
     output_data = {}        
     collected_data = json.loads(msg.value().decode('utf-8'))     
     output_data['instance'] = collected_data.get("labels").get("instance") 
     // set other output data      
     self.publish(metric)