Complete Example for Kafka custom Collector.
class MultiThreadedKafkaCollector(StreamCollector): def __init__(self, logger, config) -> None: self._config = config self.event = multiprocessing.Event() self.retries = 0 super(StreamCollector, self).__init__(logger, config) def invoke(self, command: chr): self.collect() def collect(self): workers = [] num_workers = self._config.get_num_workers while True: try: if self.event.is_set(): print("Exiting all child process..") for i in workers: i.terminate() sys.exit(1) num_alive = len([w for w in workers if w.is_alive()]) if num_workers == num_alive: continue for _ in range(num_workers - num_alive): p = Process(target=self._consume, daemon=True, args=()) p.start() workers.append(p) except Exception as e: print("Exception in process ", e) def _consume(self): consumer_config = {} // set configuration num_threads = self._config.get_num_threads topics = self._config.get_topic_name consumer = Consumer(consumer_config) consumer.subscribe([topics]) msg_queue = Queue(maxsize=num_threads) while True: try: msg = consumer.poll(60) if msg is None: continue if msg.error(): raise KafkaException(msg.error()) msg_queue.put(msg) t = threading.Thread(target=self._transform_message, args=. (msg_queue, consumer)) t.start() except Exception as e: //retry logic def _transform_message(self, msg_queue, consumer) -> TCOBase: msg = msg_queue.get(timeout=60) output_data = {} collected_data = json.loads(msg.value().decode('utf-8')) output_data['instance'] = collected_data.get("labels").get("instance") // set other output data self.publish(metric)