Apache Kafka Part IV – Writing Producers

Introduction

In the previous posts in this series we provided a brief overview of Kafka, we provided an architectural overview and described how to install Kafka in multiple configurations on an Ubuntu Linux Distribution. A listing of these posts is provided below:

  1. Experiences with Apache Kafka provided us with an architectural overview of Kafka.
  2. In the Ubuntu installation post we installed Kafka in a single node, single broker configuration.
  3. In the Multiple Brokers post we investigated how to run a multi-broker Kafka installation.

In this post, we will finally get my hands dirty, and we will start writing Kafka producers, followed by consumers in later posts.

Writing Kafka Producers – An Overview

A Kafka producer is an application which creates messages and publishes these messages to a Kafka broker for downstream processing by a consumer. Kafka producers can be all types of applications, a few of which are shown below:

  • Internet of Things (IOT) applications. The edge devices (smart sensors and/or devices) of these systems generate a high volume of telemetry and command-and-control messages. These applications need a scalable, high-volume message ingestion infrastructure which can facilitate messaging rates often exceeding one millions messages per second.
  • Back-end services which need to collect activity stream and operational metrics. A typical example of this is LinkedIn, which powers its LinkedIn news feed, LinkedIn Today and offline analytics through Kafka.
  • Enterprise Distributed Messaging System scenarios. More and more, Kafka is being used as an alternative to traditional enterprise messaging systems such as RabbitMQ and ActiveMQ.

In this post we will write a Java-based Kafka producer running on Ubuntu. In later posts, we will focus on writing producers in different languages and running on diverse platforms, such as Python and Ruby on Linux, and C# running on a .Net platform.

The diagram below outlines the different steps involved in a typical Kafka Producer workflow:

KafkaProducerWorkflow

The producer connects to any of the alive nodes and requests metadata about the leaders for the partitions of a topic. This allows the producer to put the message directly to the lead broker for the partition. Remember, the leader is the node responsible for all reads and writes for a given topic partition.

The Kafka producer API exposes the interface for semantic partitioning by allowing the producer to specify a key to partition by and using this to hash the partition. This, the producer can completely control which partition it publishes messages to. For example, if Product ID is selected as a key, then all messages for a given product will be sent to the same partition. This also allows data consumers to make locality assumptions about customer data. Alternatively, a producer can choose to use a “round robin” way of distributing messages over the different partitions.

For high efficiency in Kafka, producers can also publish the messages in batches that work in asynchronous mode only. In asynchronous mode, the producer works either with a fixed number of messages of fixed latency defined by a producer configuration as follows:

  1. queue.time is used to define the latency.
  2. batch.size is used to define the message batch size

In this case, data is accumulated in memory at the producer’s end and published in batches in a single request. Asynchronous mode also brings the risk of losing the data in the case of a producer crash with accumulated, non-published, in-memory data.

In the next section , we will discuss the API provided by Kafka for writing Java-based custom producers.

The Java Producer API

Coming soon… post in progress..

 Creating the Eclipse Maven project skeleton

In this section, we will create the Producer skeleton project project in Eclipse as follows:

  • Start Eclipse. I am currently running the 3.8.1 version.
  • Go to File | New |  Other | Maven | Maven Project. If you don’t see the Maven option you probably don’t have the M2E Maven plug-in installed. In this case you first need to install the plug-in. A good link is the following Stack Overflow post: : http://stackoverflow.com/questions/8620127/maven-in-eclipse-step-by-step-installation
  • In the New Maven Project dialog, select Create a simple project as is shown below:

NewMavenProject

  • In the next screen, fill in the Group Id, Artifact Id and Version as is shown below:

NewMavenKafkaProject

  • Click Finish to create the project.

We now have a Maven project with a pom.xml file in your Eclipse workspace. In a terminal window, navigate to the project home directory and do a mvn clean install command to ensure that the build works as expected:

Kafka mvn clean install

Adding the Kafka dependencies to the pom.xml file

Next, we need to modify our pom.xml file to add the dependencies for Kafka. To get the latest Maven Kafka dependencies, use the following URL: http://mvnrepository.com/artifact/org.apache.kafka.

In my case, I selected the 0.8.2.1 version. Copy the XML snippet, and add it to your pom.xml file, as is shown below:

At this point, it is also a good idea to add the sl4j logging dependency, since we will be using it later. You can find this artifact at: http://mvnrepository.com/artifact/org.slf4j. I selected the 1.7.12 version. The updated dependencies section of our pom.xml now looks as follows:

At this point, it might again be a good idea to make sure that you have a working build by running mvn clean install in your terminal window.

We still need to fix up some items:

Add a properties section, containing a sourceEncoding property to get rid of a “platform  encoding” error:

We also need to make sure that we add a <packaging> node with a value of “jar”, as is shown below:

Finally, we need to set the JDK version. We have version 1.7 of the JDK installed on our image, so we can add a <build> section with the maven-compiler-plugin, and the JDK version set to 1.7:

We now have a clean, stable Maven project, enabling us to start focusing on our Kafka producer code in the next section.

Creating the Simple Producer

First, we need to to add a new Java package for our Producer. In the Eclipse Packager Explorer, right click the /src/main/java node and select New | Package. Enter com.bhaelen.blogpost.kafka.SimpleProducer as the package name and click Finish.

Next, add new class named SimpleProducer to the package. This will be the class that contains our main producer code.

Importing Classes

All Java Kafka producers need to import the classes listed below. Edit the SimpleProducer.java class, and add the following statements:

Note: You might have seen documentation that import the kafka.* packages, like so:

Before Kafka 0.8.2, kafka.javaapi.producer.Producer was the only official Java client (producer), and was implemented in Scala. From Kafka 0.8.2 onward, there is a new Java producer API: org.apache.kafka.client.producer.KafkaProducer, which is fully implemented in Java. This cllient is production tested and generally both faster and more fully featured than the previous Scala client.

Defining properties

Next, we need to define a set of properties which specify what brokers we will be using. We will be using the same 3 broker configuration as specified in the previous post. The complete property set is shown in the code below:

Following is a brief description of the code:

  • In lines 3 through 5 we set the BOOTSTRAP_SERVERS_CONFIG property. This property specifies the list of brokers (in the [<node:port>, <node: port>] format) that the producers needs to connect to. Kafka producers automatically determine the lead broker for the topic, partition it by raising a request for the metadata, and connect to the correct broker before publishing any message.
  • In lines 6 through 11 we set the VALUE_SERIALIZER_CLASS_CONFIG and the KEY_SERIALIZER_CLASS_CONFIG propreties, which indicate what serializer should be used while preparing the message for transmission from the producer to the broker. In our case we will use the StringEncoder class.
  • The ACKS_CONFIG property on lines 12-13 instructs the Kafka broker to send an acknowledgement to the producer when a message is received. The value of 1 means the producer receives an acknowledgement once the lead replica has received the data. This option provides better durability as the producer waits until the broker acknowledges the request as successful. By default, the producer works in the “fire and forget”  mode and is not informed in the case of message loss.

more coming soon….

Leave a Reply

Your email address will not be published. Required fields are marked *