Apache Kafka®

Definition:

Apache Kafka is an open-source distributed event streaming platform.

What exactly does it mean?

A streaming platform has three key capabilities:

  • Publish and subscribe to streams of records, similar to a message queue or enterprise messaging system.

  • Store streams of records in a fault-tolerant durable way.

  • Process streams of records as they occur.

Kafka is generally used for two broad classes of applications:

  • Real-time streaming data pipelines that reliably get data between systems or applications.

  • Real-time streaming applications that transform or react to the streams of data.

To understand how Kafka does these things, let’s dive in and explore Kafka’s capabilities from the bottom up.

  • Kafka is run as a cluster on one or more servers that can span multiple data centers.

  • The Kafka cluster stores stream of records in categories called topics.

  • Each record consists of a key, a value, and a timestamp.

Food for thought

  • Schema Registry

Schema registry to kafka is XSD to XML. When there are producers and consumers connected to the Kafka stream, two parties need to be contracted. The format in which the producer will be publishing data on the Kafka topic needs to be aligned with what the consumer expects to consume. Schema registry plays a significant role in defining this contract. Whenever a new topic is created, users can add a schema associated with that topic. Schema gets a global assigned ID to itself. Schema makes sure that data pushed on a schema-driven topic is in the same format defined by the schema. If there is any discrepancy in data, then data will fail. There are topics also which don’t need any schema to be defined for them. These are free-flowing topics that expect data to be in any form.

There are multiple APIs with which we can get details of schema from schema-registry. Use the correct host: port to access schema-registry.

  • List all schemas

curl -X GET http://localhost:8081/subjects

  • Get schema by Id

curl -X GET http://localhost:8081/schemas/ids/1

  • List all version

curl -X GET http://localhost:8081/subjects/Kafka-value/versions

  • Get specific version

curl -X GET http://localhost:8081/subjects/Kafka-value/versions/1

  • Register new version

curl -X POST -H “Content-Type: application/vnd.schemaregistry.v1+json” \

–data ‘{“schema”: “{\”type\”: \”string\”}”}’ http://localhost:8081/subjects/Kafka-value/versions

  • Delete specific version

curl -X DELETE http://localhost:8081/subjects/Kafka-value/versions/1

  • List schema types

curl -X GET http://localhost:8081/schemas/types

  • Get subject-version pair

curl -X GET http://localhost:8081/schemas/ids/2/versions

  • Partition

Kafka’s topics are divided into several partitions. Partitions allow you to parallelize a topic by splitting the data into a particular case across multiple brokers present in the cluster.

Partitioning also provides balancing of data. Usually, partitioning logic [It is a unique partition number that works as a placeholder for data. This number needs to be calculated using shared logic between producer and consumer. Something like a hash algorithm] is used before pushing data on the topic that producers and consumers work hand-in-hand. Kafka also provides a replica factor, a fault-tolerance mechanism by which Kafka keeps a copy of data across brokers. OffSet is something like a logical memory location number that gets updated on each push to the topic.

Hands-on:

A] Basic: Pub-Sub

Let’s see how we can configure basic Kafka Consumer/Producer event streaming.

  • NodeJS

    • Pre-setup
  • Apache Kafka >=0.9
  • Node.js >=4
  • openssl 1.0.2
    • Setup

The node-rdkafka library is a high-performance NodeJS client for Apache Kafka that wraps the native librdkafka library. The library should encapsulate all the complexity of balancing writes across partitions and managing (possibly ever-changing) brokers. This library currently uses librdkafka version 1.2.2.

    • Initial steps

  • npm install node-rdkafka
  • var Kafka = require(‘node-rdkafka’);
    • Kafka Producer
  1. Producer instance initialization:
var producer = new Kafka.Producer({

‘client.id’: ‘kafka’,

‘metadata.broker.list’: ‘localhost:9092’,

‘compression.codec’: ‘gzip’,

‘retry.backoff.ms’: 200,

‘message.send.max.retries’: 10,

‘socket.keepalive.enable’: true,

‘queue.buffering.max.messages’: 100000,

‘queue.buffering.max.ms’: 1000,

‘batch.num.messages’: 1000000,

‘dr_cb’: true

});

  1. Connect producer by calling connect().

producer.connect();

  1. Subscribe to the event ‘ready’ on producer and try to publish the message on Kafka.
producer.on(‘ready’, function() {

try {

producer.produce( ‘topic’, null, Buffer.from(‘Awesome message’),

‘Stormwind’, Date.now());

} catch (err) {

console.error(‘A problem occurred when sending our message’);

console.error(err);

}

});

    • Kafka Consumer
  1. Consumer instance initialization
var consumer = new Kafka.KafkaConsumer({ ‘group.id’: ‘kafka’,

‘metadata.broker.list’: ‘localhost:9092’, ‘rebalance_cb’: function(err, assignment) {

if (err.code === Kafka.CODES.ERRORS.ERR__ASSIGN_PARTITIONS) {

this.assign(assignment);

} else if (err.code == Kafka.CODES.ERRORS.ERR__REVOKE_PARTITIONS) {

this.unassign();

} else console.error(err);

}

})

  1. Connect Consumer by calling connect() on consumer.

consumer.connect();

  1. Subscribe for the events ‘ready’ & ‘data’ on the consumer. On a data event, we have to execute the business logic on Kafka messages.
consumer.on(‘ready’, function() {

consumer.subscribe([‘librdtesting-01’]);

setInterval(function() {

consumer.consume(1);

}, 1000);

})

.on(‘data’, function(data) {

console.log(‘Message found! Contents below.’);

console.log(data.value.toString());

});

  • Java

    • Pre-setup
  • Apache Kafka >=0.9
  • Java
    • Setup

Maven is used for project build, dependency management. We will utilize the official Kafka-Client library from Apache. Use the following snippet to import the dependency.

<dependency>

<groupId>org.apache.kafka</groupId>

<artifactId>kafka-clients</artifactId>

<version>2.8.0</version>

</dependency>

    • Important packages
import java.util.Properties;

import org.apache.kafka.clients.producer.*;

import org.apache.kafka.common.serialization.*;

    • Kafka Producer
  1. Create a Java properties object and store all the properties of the producer in that object. These properties include our Kafka brokers.

Properties producerProps = new Properties(); producerProps.put(“bootstrap.servers”,”broker-1.afourtech.com:9093,”

+ “broker-2.afourtech.com:9093, “

+ “broker-3.afourtech.com:9093, “

+ “broker-4.afourtech.com:9093, “

+ “broker-5.afourtech.com:9093”);

  1. We need to authenticate to connect to the broker if applicable. Provide your SASL credentials to be able to connect.

producerProps.put(“sasl.jaas.config”, “org.apache.kafka.common.security.plain.PlainLoginModule required username=\”<USERNAME>\” password=\”<PASSWORD>\”;”);

producerProps.put(“security.protocol”, “SASL_SSL”);

producerProps.put(“sasl.mechanism”, “PLAIN”);

producerProps.put(“ssl.protocol”, “TLSv1.2”);

producerProps.put(“ssl.enabled.protocols”, “TLSv1.2”);

producerProps.put(“ssl.endpoint.identification.algorithm”, “HTTPS”);

  1. Specify a key and value serializer for serializing the messages before sending them to Kafka. Since we’re sending String data so, we used the default String key and value serializer available with the Kafka library.

producerProps.put(“key.serializer”, “org.apache.kafka.common.serialization.StringSerializer”);

producerProps.put(“value.serializer”, “org.apache.kafka.common.serialization.StringSerializer”);

  1. Once all properties are set, we can send example messages. Following is an example to send a message.

Producer<String,String> producer = new KafkaProducer<>(producerProps);

for (int i = 0; i < 1000; i++) {

producer.send(new

ProducerRecord<String,String>(“getting-started”,

Integer.toString(i), Integer.toString(i)));

Thread.sleep(5000);

}

producer.close();

  1. We can close our producer by calling the producer. close().

    • Kafka Consumer
  1. Similar to the producer, we first have to specify the consumer’s properties.
Properties consumerProps = new Properties(); consumerProps.put(“bootstrap.servers”,”broker-1.afourtech.com:9093,”

+ “broker-2.afourtech.com:9093,”

+ “broker-3.afourtech.com:9093,”

+ “broker-4.afourtech.com:9093,”);

// Authentication

consumerProps.put(“sasl.jaas.config”, “org.apache.kafka.common.security.plain.PlainLoginModule required username=\”<USERNAME>\” password=\”<PASSWORD>\”;”);

consumerProps.put(“security.protocol”, “SASL_SSL”);

consumerProps.put(“sasl.mechanism”, “PLAIN”);

consumerProps.put(“ssl.protocol”, “TLSv1.2”);

consumerProps.put(“ssl.enabled.protocols”, “TLSv1.2”);

consumerProps.put(“ssl.endpoint.identification.algorithm”, “HTTPS”);

// Key-Value serializers

consumerProps.put(“group.id”, “KafkaExampleConsumer”);

consumerProps.put(“key.deserializer”, “org.apache.kafka.common.serialization.StringDeserializer”);

consumerProps.put(“value.deserializer”, “org.apache.kafka.common.serialization.StringDeserializer”);

  1. We will use a Kafka consumer to consume messages, where a ConsumerRecord represents each message.
  1. We can now subscribe to a list of topics—call poll(long timeout) to poll the Kafka for new messages.
Consumer<String, String> consumer = new KafkaConsumer<>(consumerProps);

consumer.subscribe(Arrays.asList(“topic1”));

while (true) {

ConsumerRecords<String, String> records = consumer.poll(100);

for (ConsumerRecord<String, String> record : records)

System.out.printf(“offset = %d, key = %s, value = %s%n”, record.offset(), record.key(), record.value());

}

  1. This consumer gets messages from the topic “topic1” in our Kafka cluster and prints them to the console.

B] Advanced: Kafka Streaming Application

  • NodeJS

    • Pre-setup

  • Apache Kafka >= 0.11.x

  • Node.js >=8.x.x

    • Setup

Kafka Streams is a client library for building mission-critical real-time applications, where the input and/or output data is stored in Kafka brokers. This library currently uses Kafka-streams 5.0.0

    • Initial steps

  • npm install –save kafka-streams

  • npm install –save node-rdkafka

  • const {KafkaStreams} = require(“kafka-streams”);

Kafka Config

const batchOptions = {

batchSize: 5,

commitEveryNBatch: 1,

concurrency: 1,

commitSync: false,

noBatchCommits: false

};

export const kafkaConfig = {

noptions: {

“metadata.broker.list”: “localhost:9092”, //native client requires broker hosts to connect to

“group.id”: “kafka-streams-test-native”,

“client.id”: “kafka-streams-test-name-native”,

“enable.auto.commit”: false,

“heartbeat.interval.ms”: 250,

},

tconf: {

“auto.offset.reset”: “earliest”,

},

batchOptions};

    • Kafka Producer

  1. The product type setting describes how the message format should look like. There are three available types. We can format the message using three types.

  • Send: Raw messages, no changes to the value, can be any type.

  • buffer: Gives message values a specific format {id, key, payload, timestamp, version} (requires an event to be an object)

  • buffer format: Gives message values a specific format {id, key, payload, timestamp, version, type} (requires an event to be an object)

Use the third parameter to define the type of stream. Eg. stream.to(“topic”, partitionCount, “send”).

  1. Any single (stream) event can overwrite the default settings configured with the .to() call. If it brings a key, value object structure. Like this one

    1. By default, the whole stream event will be passed as Kafka message value using the send produceType.

const {KafkaStreams} = require(“kafka-streams”);

const kafkaStreams = new KafkaStreams(kafkaConfig);

Const producerStream = kafkaStreams.getKStream();

producerStream .from(“input-topic”)

.mapJSONConvenience()

.mapWrapKafkaValue()

.tap(console.log)

.wrapAsKafkaValue() //value -> {key, value, ..}

.to(“output-topic”);

myConsumerStream.start();

    1. We can also create Kafka producers without any input topic stream.

const stream = factory.getKStream(null);

//creating a stream without topic is possible

stream.to(“output-topic”);

//Wait for the kafka producer to be ready and Write a few messages to topic.

stream.start().then(_ => {

stream.writeToStream(“my message”);

stream.writeToStream([ “one more message” ]);

setTimeout(kafkaStreams.closeAll.bind(kafkaStreams), 5000);

});

  1. Further, we will explore the various Stream operations.

    1. Merge topics: We can merge multiple topic streams into a single stream. It combines the topics’ output.

const kafkaStreams = new KafkaStreams(config);

const stream1 = kafkaStreams.getKStream(“my-input-topic-1”);

const stream2 = kafkaStreams.getKStream(“my-input-topic-2”);

const stream3 = kafkaStreams.getKStream(“my-input-topic-3”);

//merge will make sure any message that is consumed on any of the streams

//will end up being emitted in the merged stream

const mergedStream = stream1.merge(stream2).merge(stream3);

mergedStream.filter(v => !!v); //Filter nulls

Promise.all([ stream1.start(), stream2.start(), stream3.start(), mergedStream.to(“my-merged-output-topic”) ])

.then(() => {

//close after 5 seconds

setTimeout(kafkaStreams.closeAll.bind(kafkaStreams), 5000);

});

    1. Topic as table: KStream (change-log topic representation) is a KTable like representation of a topic, meaning only the last value of a key will be present in the table.

const kafkaStreams = new KafkaStreams(config);

//creating a ktable requires a function that can be

//used to turn the kafka messages into key-value objects to save in KTable.

const table = kafkaStreams.getKTable(“topic”, keyValueMapperEtl);

function keyValueMapperEtl(kafkaMessage) {

const value = kafkaMessage.value.toString(“utf8”);

const elements = value.toLowerCase().split(” “);

return {

key: elements[0], value: elements[1]

}

}

//consume the first 100 messages on the topic to build the table

table.consumeUntilCount(100, () => {

// There are two ways /to access the content now

table.getTable().then(map => {

console.log(map.fruit); //will log “strawberry”

});

table.forEach(row => { //2. as replayed stream

console.log(row);

});

//Places every key-value member of the internal map onto the stream

table.replay();

})

table.start();

    • Kafka Consumer

  1. Kafka consumer initialization:

const {KafkaStreams} = require(“kafka-streams”);

const kafkaStreams = new KafkaStreams(kafkaConfig);

const myConsumerStream = kafkaStreams.getKStream()

consumerStream .from(“my-topic”)

.mapBufferKeyToString() //key: Buffer -> string

.mapBufferValueToString() //value: Buffer -> string

.forEach(console.log);

consumerStream.start()

  1. Call the start() method to start the stream and process the messages.

  2. By default, key and value are byte serialized, so we will convert the bytes to string.

  • Java

    • Pre-setup

  • Apache Kafka >=0.9

  • Java

    • Setup

Maven is used for project build, dependency management. We will utilize the official Kafka-Client library from Apache. Use the following snippet to import the dependency.

<dependency>

<groupId>org.apache.kafka</groupId>

<artifactId>kafka-clients</artifactId>

<version>2.8.0</version>

</dependency>

<dependency>

<groupId>org.apache.kafka</groupId>

<artifactId>kafka-streams</artifactId>

<version>1.0.0</version>

</dependency>

    • Important Packages

import org.apache.kafka.common.config.*;

import org.apache.kafka.common.serialization.Serdes;

import org.apache.kafka.streams.*;

Stream Configuration

We have to create a stream configuration. We will utilize this configuration to execute streams.

final String bootstrapServers = “localhost:9092”;

final Properties streamsConf = new Properties();

// Give Streams application a unique name. It must be unique in the cluster

streamsConf.put(StreamsConfig.APPLICATION_ID_CONFIG, “kafka-stream-app”);

streamsConf.put(StreamsConfig.CLIENT_ID_CONFIG, “kafka-streams-app-client”);

streamsConf.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);

//Default serlializer/deserializer

streamsConf.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass().getName());

streamsConf.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass().getName());

    • Kafka Producer(output Stream)

  1. Produce the output to the topic.

inStream.to(“output-topic”);

  1. Starts the Streams Application

final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConf);

streams.cleanUp();

streams.start();

// Add shutdown hook to respond to SIGTERM and gracefully close Kafka Streams

Runtime.getRuntime().addShutdownHook(new Thread(streams::close));

    • Kafka Consumer(Input Stream)

  1. We’ll create a new input KStream object from the input topic. This stream will serve as an input stream from Kafka topic(s), emitting the message.

final StreamsBuilder builder = new StreamsBuilder();

// Write the input data as-is to the output topic.

KStream inStream = builder.stream(“input-topic”)

//We can perform multiple operation on input stream eg. flatMapValues(), map(),

// groupByKey(), count(), mapValues()

Kafka Connect

Kafka Connect is a framework for connecting Kafka with external systems such as databases, key-value stores, search indexes, and file systems, using Kafka Connectors.

Kafka Connectors are ready-to-use components used to import or export data between Apache Kafka and other external systems. Connector implementations are available for familiar data sources and sinks. Also, we can implement our connectors as per the custom functionality. Connectors and tasks are logical units of work and run as a process. This process is called a worker in Kafka Connect. There are two modes for running workers: standalone mode and distributed mode. You should identify which mode works best for your environment before getting started.

Standalone mode is helpful for the development and testing of Kafka Connect. In Standalone mode, a single process executes all connectors and associated tasks. However, We’ll want to run in a Distributed mode in production. Distributed mode runs Connect workers on multiple machines (nodes). These form a Connect cluster. Kafka Connect distributes running connectors across the cluster. We can add more nodes or remove nodes as your needs evolve. Distributed mode is more fault-tolerant. If a node unexpectedly leaves the cluster, Kafka Connect automatically distributes the work of that node to other nodes in the cluster. Kafka Connect stores connector configurations, status, and offset information inside the Kafka cluster where it is replicated; losing the node where a Connect worker runs do not result in any lost data. So, Standalone scalability is limited. Also, there is no automated fault-tolerance out-of-the-box when a connector goes offline

Kafka connectors can be configured to run more tasks within their processes, allowing connectors to instantiate a set of functions that copy the data. By enabling the connector to break a single job into many tasks, Kafka Connect provides built-in support for parallelism and scalable data copying with minimal configuration.

The logical view of an input stream, a source connector, the tasks

it creates, and the data flow from the source system into Kafka.

The following Kafka Connector Source is configured to run 10 tasks, as shown in the JDBC source example.

name=mysql-bulk-source

connector.class=io.confluent.connect.jdbc.JdbcSourceConnector

tasks.max=10

connection.url=jdbc:mysql://localhost/employees

connection.user=<db-user>

connection.password=<db-password>

mode=bulk

topic.prefix=mysql-

We will execute a connector and push the messages from the text file to the Kafka topic using the Source connector. Below is the flow diagram we’re going to implement.

  • Source Connector Configuration

For the source connector, the reference configuration is available at $CONFLUENT_HOME/etc/kafka/connect-file-source.properties:

name=local-file-source

connector.class=FileStreamSource

tasks.max=1

topic=connect-test

file=path/to/file/test.txt

Some properties are common for all source connectors:

  • name- User-specified name for the connector instance.

  • Connector.class- Specifies the connector implementation class.

  • tasks.max- specifies how many instances of source connector should run in parallel.

  • Topic- Topic to which the connector should send the output.

  • File- Defines the file from which the connector should read the input.

The current working directory is $CONFLUENT_HOME.

Create a basic file with some content:

echo -e “foo\nbar\n” > test.txt

  • Worker Config

Finally, configure the Connect worker. It will execute our connector and read from the source connector and write it to topic. For that, we can use $CONFLUENT_HOME/etc/kafka/connect-standalone.properties:

bootstrap.servers=localhost:9092

key.converter=org.apache.kafka.connect.json.JsonConverter

value.converter=org.apache.kafka.connect.json.JsonConverter

key.converter.schemas.enable=false

value.converter.schemas.enable=false

offset.storage.file.filename=/tmp/connect.offsets

offset.flush.interval.ms=10000

plugin.path=/share/java

Note that plugin.path can hold a list of paths, where connector implementations are available

For the other parameters, we can leave the default values:

  • bootstrap.servers– Addresses of the Kafka brokers

  • key.converter and value.converter– Converter classes, which serialize and deserialize the data as it flows from the source into Kafka.

  • key.converter.schemas.enable and value.converter.schemas.enable- Converter-specific settings.

  • offset.storage.file.filename- Location at which Kafka-Connect standalone should store its offset data.

  • offset.flush.interval.ms- Interval at which the worker tries to commit offsets for tasks.

  • Kafka Connect in Standalone Mode

Command to start connector in standalone mode.

$CONFLUENT_HOME/bin/connect-standalone \

$CONFLUENT_HOME/etc/kafka/connect-standalone.properties \

$CONFLUENT_HOME/etc/kafka/connect-file-source.properties \

$CONFLUENT_HOME/etc/kafka/connect-file-sink.properties

  • Kafka Connect in Distributed Mode

We don’t pass configuration files for each connector to startup. Instead, start up the Kafka Connect Distributed process and then manage connectors via Connect’s REST API. Find more documentation for Connect’s REST API here.

Command to start Kafka Connect in Distributed mode

$CONFLUENT_HOME/bin/connect-distributed \

$CONFLUENT_HOME/etc/kafka/connect-distributed.properties

Ensure this Distributed mode process is ready to accept requests for Connector management via the Kafka Connect REST interface. Verify once by executing the following cUrl to retrieve the connectors list.

curl http://localhost:8083/connectors

The response should be 200 OK. Now the Kafka connect interface is ready, and we can create connectors.

To create a source connector, we need to make the body for the source connector POST as a JSON file. Let’s name it connect-file-source.json.

{

“name”: “local-file-source”,

“config”: {

“connector.class”: “FileStreamSource”,

“tasks.max”: 2,

“file”: “test.txt”,

“topic”: “connect-test”

}

}

It’s similar to the reference configuration file we used with a standalone worker in the last example, and then we POST it to create a source connector.

curl -d @”$connect-file-source.json” -H “Content-Type: application/json”

-X POST http://localhost:8083/connectors

  • Output verification on topic

Run the Kafka consumer and consume the topic defined in the source connector configuration. Verify the output with the file content.

$CONFLUENT_HOME/bin/kafka-console-consumer –bootstrap-server localhost:9092 –topic connect-test –from-beginning
  • Delete connector

We can delete the connector by executing the following cUrl command.

curl -X DELETE http://localhost:8083/connectors/<connector-name>
  • Connector Setup: Installing Connectors from Confluent Hub

The enterprise version of Confluent provides a script for installing Connectors and other components from Confluent Hub (however not included in the Open Source version). Install a connector using the following command:

$CONFLUENT_HOME/bin/confluent-hub

install confluentinc/kafka-connect-mqtt:1.0.0-preview

This option is limited to the enterprise version. We don’t have any tool available for the open-source version, so we need to install it manually.

  • Installing Connectors Manually

We can install the required connectors manually if a connector is not available on Confluent Hub or has the Open Source version of Confluent. For that, we have to download and unzip the connector and move the libs to the folder specified as plugin.path.

For each connector, the archive should contain two folders:

  • lib folder contains the connector jar eg. kafka-connect-mqtt-1.0.0-preview.jar and some more jars required by the connector.

  • etc folder holds one or more reference config files.

Move the lib folder to $CONFLUENT_HOME/share/java, or the path we specified as plugin.path in connect-standalone.properties and connect-distributed.properties.

  • Kafka Connector resources

Basic connectors are bundled with plain Apache Kafka (source and sink for files and console) and Confluent Platform (ElasticSearch, HDFS, JDBC, and AWS S3). Check Confluent Hub, kind of an app store for Kafka connectors. Confluent connectors (developed, tested, documented and fully supported by Confluent) and Certified connectors (Developed by 3rd party and certified by Confluent). Confluent also provides a Connectors Page with some more community connectors.

Conclusion

Here we have gone through how to leverage Kafka As a Queue and As a Streaming platform. Further, we explored Kafka-Connect’s practical Kafka integration framework to integrate Kafka with external systems and learned how to install custom connectors.

Author: Saurabha Patane & Shivam Rohila



Leave a Reply