The complete custom netconf collector process with sample code.
To write custom collector, you need to implement Stream Collector and override the definitions provided such as below:
1. Collect – collect data from any kubernetes cluster using k8s python client provided api's.
In this, use the ncclient library of python to connect to a netconf device and get data:
self._session = manager.connect_ssh(host=self._config.connect.host, port=self._config.connect.port,username=self._config.connect.username,password=self._config.connect.password, hostkey_verify=False, sock=self._proxy_sock, manager_params={"timeout": 120}, keepalive=True)
And from session you can create subscription and get data from after creating subsription.
2. Transform – transform collected data into metric, that is : Type of TCOEvent/TCOMetric or TCOTopology
Later, the transformed data publishes to Kafka bus for further processing:
class NetConfCollector(StreamCollector): def __init__(self, logger, config): self._config = config if self._config.connect.gateway.enabled: pkey_filename = "/app/config/{}.pkey".format(self._config.connect.gateway.host) pkey_fd = os.fdopen(os.open(pkey_filename, os.O_WRONLY | os.O_CREAT, 0o600), "w") pkey_fd.write(base64.b64decode(self._config.connect.gateway.key).decode()) pkey_fd.close() self.connect() super(StreamCollector, self).__init__(logger, config) self._topoCache = TopoCache(self._session, self._logger, self._config) self._topoCache.start() t = threading.Thread(target=self.reconnect, args=()) t.start() def connect(self): self._proxy_sock = None if self._config.connect.gateway.enabled: pkey_filename = "{}.pkey".format(self._config.connect.gateway.host) proxy_cmd = "ssh -oServerAliveInterval=3600 -oStrictHostKeyChecking=no -q -W {}:{} {}@{} -i {}".format( self._config.connect.host, self._config.connect.port, self._config.connect.gateway.username, self._config.connect.gateway.host, pkey_filename) self._proxy_sock = paramiko.ProxyCommand(proxy_cmd) self._proxy_sock.settimeout(10) self._session = manager.connect_ssh(host=self._config.connect.host, port=self._config.connect.port, username=self._config.connect.username, password=self._config.connect.password, hostkey_verify=False, sock=self._proxy_sock, manager_params={"timeout": 120}, keepalive=True) # override only if you want to change the default method execution sequence from collect->transform->publish def invoke(self, command: chr): self.collect() def collect(self): self._session.create_subscription() self._logger.info( '#%s - Starting the subscriber topic=%s', os.getpid(), "testtopic") while True: self._logger.info('#%s - Waiting for message...', os.getpid()) try: msg = self._session.take_notification().notification_ele # self._logger.debug("Notification received", msg.text) if msg is None: continue self._transform_message(msg) except Exception: self._logger.exception('#%s - Received exception.', os.getpid()) # Get the current Problem List and emit notifications def _transform_message(self, msg): # apply custom transform on collected object self._logger.debug('started transform on data %s', msg) # self._logger.info( # '#%sT%s - Received message: %s', os.getpid(), threading.get_ident(), msg.value().decode('utf-8')) notif_ns = "{urn:ietf:params:xml:ns:netconf:notification:1.0}" ns = "{urn:onf:params:xml:ns:yang:microwave-model}" for elem in msg.getchildren(): if elem.tag == notif_ns + "eventTime": eventTime = elem.text continue for pnotif_elem in elem.getchildren(): if pnotif_elem.tag == ns + "counter": counter = pnotif_elem.text if pnotif_elem.tag == ns + "time-stamp": ts = pnotif_elem.text if pnotif_elem.tag == ns + "problem": problem = pnotif_elem.text if pnotif_elem.tag == ns + "severity": severity = pnotif_elem.text if pnotif_elem.tag == ns + "object-id-ref": object_id = pnotif_elem.text self.publishEvent(problem, eventTime, ts, object_id, severity) # self._logger.info("transform completed", output_data)