Event-Processing-Manager configuration

As part of management pack installation through controller, Kafka Event Adapter module will be installed and also its dependency modules like the Event-Processing-Manager will also be installed.

In below example, anything that the smarts-listener receives will be forwarded to the kafka-event- adapter for the adapter to write to Kafka.

File: <DCF-Install>/Event-Processing/Event-Processing-Manager/<INSTANCE>(ex: smarts-notifs-events)/conf/processing.xml

<processing-manager xmlns="http://www.watch4net.com/Events/DefaultProcessingManager" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://www.watch4net.com/Events/DefaultProcessingManager DefaultProcessingManager.xsd ">
        <processing-element name="KAFKA" enabled="true" type="Kafka-Event-Adapter" config="Kafka-Event-Adapter/smarts-notifs-events/conf/kafka-event-adapter.xml"/>
        <processing-element name="Smarts" enabled="true" config="Smarts-Listener/smarts-notifs-events/conf/smarts-listener.xml" data="KAFKA"/>
        <processing-element name="EVENT-SPY" enabled="true" type="EventSpy" config="Event-Processing-Utils/smarts-notifs-events/conf"/>
</processing-manager>

KAFKA processing element deals with publishing the event data from Smarts Processing element in to the Kafka.

Kafka event adapter configuration

The Kafka Event Adapter comes with a configuration file located at:

<DCF-Install>/Event-Processing/Kafka-Event-Adapter/<INSTANCE>/conf/kafka-event-adapter.xml

Sample configuration fileL

<kafka-event-adapter-config xmlns="http://www.watch4net.com/KafkaEventAdapter" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://www.watch4net.com/KafkaEventAdapter ../kafka-event-adapter.xsd ">

	<cluster>
		<!-- The Kafka bootstrap servers. -->
		<server host="127.0.0.1" port="9092" />
		<!-- uncommment the following lines, to communicate with Kafka over SSL. Kafka SSL must be configured first. 
		<additional-parameters key="security.protocol">SSL</additional-parameters> 
		<additional-parameters key="ssl.keystore.location">../../../Tools/Webservice-Gateway/Default/conf/clientkeystore</additional-parameters> 
		<additional-parameters key="ssl.keystore.password">changeit</additional-parameters> <additional-parameters key="ssl.truststore.location">../../../Tools/Webservice-Gateway/Default/conf/truststore</additional-parameters> 
		<additional-parameters key="ssl.truststore.password">changeit</additional-parameters> -->
	</cluster>

	<!-- an event reader -->
	
	<event-reader topic-name="events" stream="data">
		<initial-topic-partition-offset-seeker existing-reset-policy="earliest" new-reset-policy="earliest"/>
  		<topic-partition-offset-commit location="disk"/>
 		<topic-partition-listener existing-reset-policy="earliest" new-reset-policy="earliest" refresh-interval="5m"/>
	</event-reader>
	 
	<event-reader topic-name="events" stream="data">
		<kafka-consumer-task>
			<consumer group-id="test-consumer-group" />
		</kafka-consumer-task>
	</event-reader> 

	<!-- an event writer -->
	 	
	<event-writer topic-name="sam_notifications" stream="data" isJson="true" >
	
		 <kafka-producer-settings>
		 <producer
      compression-type="none"/>
		 </kafka-producer-settings>
 	</event-writer>
</kafka-event-adapter-config>

Multiple Kafka producer Configuration

User can modify event processing configuration file (processing.xml) manually to send the collected events data to multiple Kafka endpoints.

Sample example is below:

Copy existing Kafka configuration file kafka-event-adapter.xml to kafka-event-adapter2.xml in the same folder: (sample path: <DCF-Install>/Event-Processing/Kafka-Event-Adapter/<INSTANCE>/conf/)

Add new Kafka cluster info in kafka-event-adapter2.xml.

Alter configuration file processing.xml is like below:

<processing-element name="KAFKA1" enabled="true" type="Kafka-Event-Adapter" config="Kafka-Event-Adapter/smarts-notifs-events/conf/kafka-event-adapter.xml"/>
        <processing-element name="Smarts" enabled="true" config="Smarts-Listener/smarts-notifs-events/conf/smarts-listener.xml" data="KAFKA1 KAFKA2"/>
<processing-element name="KAFKA2" enabled="true" type="Kafka-Event-Adapter" config="Kafka-Event-Adapter/smarts-notifs-events/conf/kafka-event-adapter2.xml"/>

Restart the smarts event management pack service.

Now Smarts events data will be pushed to 2 Kafka endpoints like KAFKA1 and KAFKA2.

Kafka Event Adapter parameters

Parameter

Description

<cluster>

This tag must occur at least once.

<server>

This tag must occur at least once:

* host: The address of one of the kafka bootstrap servers

* port: The port that the bootstrap server is listening on

<producer>

This tag is optional, but may be used for templating kafka producers. Refer to the schema for more information.

<consumer>

This tag is optional, but may be used for templating kafka consumers. Refer to the schema for more information.

<connection>

This tag is optional, but may be used for templating kafka connections. Refer to the schema for more information.

<additional-parameters>

This tag is optional, but may be used for configuring kafka options outside of the purview of the previous tags. These options include, but are not limited to SSL connection parameters.

Configuring Event Reader parameters

Parameter

Description

<event-reader>

This tag is used to define a component that will consumer from kafka:

topic-name : This attribute is used to specify the kafka topic to read from.

stream : This attribute is used to specify the stream to push the events to once they have been read.

<kafka-consumer-task>

This tag may be used to customize the way that we will read from kafka.

* poll-timeout-ms : The poll timeout for kafka.

* error-timeout-ms : The error timeout for connecting to kafka.

* error-timout-double-n : How many errors before we double the timeout.

* error-timout-max-ms : The maximum error timeout.

<consumer>

This tag may be used to customize how this element will consume from kafka. Of note, is that the attribute group-id must be used here if we plan on joining a kafka consumer group.

<connection>

This tag may be used to customize the kafka connection for this element.

<additional-parameters>

This tag may be used to customize the additional-parameters for this element.

<initial-topic-partition-offset-seeker>

This element is used when wanting to control how we seek in a kafka topic.

existing-reset-policy : policy when seeking on existing partition but the requested offset no longer exists. Must be one of earliest or latest.

new-reset-policy : policy when seeking on a new partition. Must be one of earliest or latest.

<topic-partition-offset-commit>

This element must be used if we would like to control how our offsets are committed.

location : where to commit the topic partition offsets. Must be one of disk, kafka, none.

commit-interval: Offsets are committed no more frequently than this interval.

<topic-partition-listener>

Required if kafka-consumer group is not set. Used to refresh the list of topic partitions periodically.

new-reset-policy : where to seek in new partitions. (at earliest or latest offset in the new partition: usually at the earliest).

existing-reset-policy : where to seek in existing partitions when current offset is out of range. (usually at the earliest).

refresh-interval

how often to check for new partitions.

Configuring Event Writer parameters

Parameters

Description

<event-writer>

This tag is used to define a component that will write to kafka.

topic-name : The topic we will write to.

stream : The event stream to consume from.

isJson : Json format output is enabled.

<kafka-producer-settings>

This tag may be used if we want to customize how the writer writes to kafka. Refer to the schema for more information.

<connector-component-behavior>

This tag may be used if we want to control how often we flush to kafka.

<key-encoder>

This tag may be used if we want to customize how the kafka key is encoded.

<value-encoder>

This tag may be used if we want to customize how the kafka value is encoded.

Sample Kafka producer configuration (for Event Writer) values are below:

<kafka-producer-settings>	
	    <!-- Servers information -->    
  		<server host="10.106.126.209"/>
	    
	    <!-- Producer (and topic) information -->
	    <producer topic-name="events"
	        acks="1"
	        retries="0"
	        linger-ms="0ms"
	        buffer-memory="33554432"
	        compression-type="none"
	        batch-size="16384"
	        max-block-ms="2s"
	        max-in-flight-requests-per-connection="5"
	        max-request-size="1048576"/>
	        
	    <!-- All connection parameters will be attributes except "server" -->
	    <connection request-timeout-ms="5s"
	        connections-max-idle-ms="9m"
	        retry-backoff-ms="100ms"
	        reconnect-backoff-ms="50ms"/>
	        
	    <!-- Additional properties -->
	    <additional-parameters key="metadata.max.age.ms">1000</additional-parameters>
	    <additional-parameters key="receive.buffer.bytes">32768</additional-parameters>
	    <additional-parameters key="send.buffer.bytes">131072</additional-parameters>
	</kafka-producer-settings>