The complete custom netconf collector process with sample code.

To write custom collector, you need to implement Stream Collector and override the definitions provided such as below:

1. Collect – collect data from any kubernetes cluster using k8s python client provided api's.

In this, use the ncclient library of python to connect to a netconf device and get data:

self._session = manager.connect_ssh(host=self._config.connect.host, port=self._config.connect.port,username=self._config.connect.username,password=self._config.connect.password, hostkey_verify=False, sock=self._proxy_sock, manager_params={"timeout": 120}, keepalive=True)

And from session you can create subscription and get data from after creating subsription.

2. Transform – transform collected data into metric, that is : Type of TCOEvent/TCOMetric or TCOTopology

Later, the transformed data publishes to Kafka bus for further processing:

class NetConfCollector(StreamCollector):

    def __init__(self, logger, config):
        self._config = config
        if self._config.connect.gateway.enabled:
            pkey_filename = "/app/config/{}.pkey".format(self._config.connect.gateway.host)
            pkey_fd = os.fdopen(os.open(pkey_filename, os.O_WRONLY | os.O_CREAT, 0o600), "w")
            pkey_fd.write(base64.b64decode(self._config.connect.gateway.key).decode())
            pkey_fd.close()
        self.connect()
        super(StreamCollector, self).__init__(logger, config)
        self._topoCache = TopoCache(self._session, self._logger, self._config)
        self._topoCache.start()
        t = threading.Thread(target=self.reconnect, args=())
        t.start()

    def connect(self):
        self._proxy_sock = None
        if self._config.connect.gateway.enabled:
            pkey_filename = "{}.pkey".format(self._config.connect.gateway.host)
            proxy_cmd = "ssh -oServerAliveInterval=3600 -oStrictHostKeyChecking=no -q -W {}:{} {}@{} -i {}".format(
                self._config.connect.host,
                self._config.connect.port,
                self._config.connect.gateway.username,
                self._config.connect.gateway.host,
                pkey_filename)
            self._proxy_sock = paramiko.ProxyCommand(proxy_cmd)
            self._proxy_sock.settimeout(10)
        self._session = manager.connect_ssh(host=self._config.connect.host, port=self._config.connect.port,
                                            username=self._config.connect.username,
                                            password=self._config.connect.password, hostkey_verify=False, sock=self._proxy_sock, manager_params={"timeout": 120}, keepalive=True)

    # override only if you want to change the default method execution sequence from collect->transform->publish
    def invoke(self, command: chr):
        self.collect()

    def collect(self):
        self._session.create_subscription()
        self._logger.info(
            '#%s - Starting  the subscriber topic=%s', os.getpid(), "testtopic")
        while True:
            self._logger.info('#%s - Waiting for message...', os.getpid())
            try:
                msg = self._session.take_notification().notification_ele
                # self._logger.debug("Notification received", msg.text)
                if msg is None:
                    continue
                self._transform_message(msg)
            except Exception:
                self._logger.exception('#%s - Received exception.', os.getpid())
        # Get the current Problem List and emit notifications

    def _transform_message(self, msg):
        # apply custom transform on collected object
        self._logger.debug('started transform on data %s', msg)
        # self._logger.info(
        #    '#%sT%s - Received message: %s', os.getpid(), threading.get_ident(), msg.value().decode('utf-8'))
        notif_ns = "{urn:ietf:params:xml:ns:netconf:notification:1.0}"
        ns = "{urn:onf:params:xml:ns:yang:microwave-model}"

        for elem in msg.getchildren():
            if elem.tag == notif_ns + "eventTime":
                eventTime = elem.text
                continue
            for pnotif_elem in elem.getchildren():
                if pnotif_elem.tag == ns + "counter":
                    counter = pnotif_elem.text
                if pnotif_elem.tag == ns + "time-stamp":
                    ts = pnotif_elem.text
                if pnotif_elem.tag == ns + "problem":
                    problem = pnotif_elem.text
                if pnotif_elem.tag == ns + "severity":
                    severity = pnotif_elem.text
                if pnotif_elem.tag == ns + "object-id-ref":
                    object_id = pnotif_elem.text
        self.publishEvent(problem, eventTime, ts, object_id, severity)
        # self._logger.info("transform completed", output_data)