Experiences with Apache Kafka – Part I

Introduction

This is the first post in a series which will focus on Apache Kafka and how it can be leveraged in a Big Data environment. Kafka is typically used as the primary message ingestion point for a Hadoop streaming application, where it is often paired with Apache Storm to perform real-time event processing.

Kafka Overview

Kafka is an

  • Open source
  • Distributed
  • Partitioned
  • Replicated
  • Commit-log-based

publish-subscribe messaging system.

For the remainder of this post, I would like to establish the following standard terminology to define the components that make up a tyupical Kafka messaging system:

  • The key abstraction in Kafka is the Topic. At the implementation level, a Kafka topic is just sharded write-ahead log. Topics are partitioned and each partition is represented by an ordered, immutable sequence of messages.
  • Processes that publish messages to a Kafka topic are referred to as Producers. Producers publish data to a topic by choosing the appropriate partition within the topic.
  • Processes that subscribe to topics and process the published messages are called Consumers.
  • Kafka is run as a cluster comprised of one or more servers each of which is called a Broker. Topics are created within the context of broker processes.

These core concepts are illustrated in the figure below:

Kafka

 

 

 

 

 

Communication between the clients and the servers is performed with a simple, high-performance, language agnostic TCP protocol. As part of this blog series, we will write a number of Java, .NET C#, and Python clients.

Topics

A topic can be seen as a category, stream or feed to which a producer can publish messages. Consumers can read the published messages from the specified topics. This is illustrated in the figure below:

Topic Overview

 

 

 

 

 

 

 

For each topic the Kafka cluster maintains a partitioned log which looks like the following image:

topics

A partition is nothing more than an immutable, ordered sequence of messages. As messages are published the commit log will be updated. Each message in a partition is assigned a sequential ID number which uniquely identifies each message within the partition. This sequence number is often referred to as the message offset.

All messages in a Kafka cluster are being retained  for a configurable amount of time, independent of the fact that they have been consumed by any consumer. In this context, Kafka is similar to Azure’s Event Hub messaging service. So, for example, if the messages retention policy is set to five days, then for the first five days after a message is published it will remain available for consumption. After the retention window expires, the message will be discarded to free up space. The performance of Kafka is relatively constant with respect to partition sizes, so retaining a lot of data will not negatively affect overall performance.

The only metadata retained by the system on a per-consumer basis is the position of the consumer in the log, called the “offset”. This offset is controlled by the consumers; normally the consumer will advance its offset linearly as it reads messages, but since the consumer is in control, it can consume the messages in any preferred order. For example, a consumer can reset to an older offset it it wishes to re-process a batch of messages.

This also implies that Kafka consumers are very lightweight, and have very little impact on the cluster’s overall performance. Since each consumer maintains its own offset, it has no ability to impact other consumers.

Partitions within a topic really serve two distinct purposes:

  1. The allow a topic to scale beyond a size that will fit on a single server. Each individual partition must fit on the servers that host it, but a topic may have many partitions, so it can handle an arbitrary amount of data.
  2. Partitions act as the primary unit of parallelism in Kafka. They allow  a single consumer to concurrently read messages in multiple concurrent threads. I will elaborate on this subject in detail in a later post.

The following image summarizes our insights on topics and partitions:

PartitionsSummary

Partition Distribution

The partitions of a topic are distributed over the servers in the Kafka clusters with each server handling data and request for a share of the partitions. Each partition is replicated across a configurable number of servers for fault tolerance.

Each partition has one server which acts as the “leader” and zero on more servers which act as “followers”. The leader handles all read and write requests for the partition while the followers passively replicate the leader. If the leader fails, one of the followers will automatically become the new leader.  A follower acts as a normal consumer, it pulls messages and updates it own log. When a follower rejoins a partition it first re-syncs all messages, after which it can become fully online.

The leader uses the Zookeeper heartbeat mechanism to indicate that it is alive and well. Each server acts as a leader some of its partitions and a follower for others so load is well balanced within the cluster.

The basic principles of replication are summarized in the figure below:

Replication

Zookeeper

Another important component of a Kafka infrastructure is Zookeeper. While most readers will know Zookeeper from the larger Hadoop ecosystem, it is also used in stand-alone Kafka deployments.

Zookeeper servers as the coordination interface between the Kafka brokers and consumers. At the core, Zookeeper is an open source, High Performance coordination service for distributed applications. Within the Apache Hadoop ecosystem it acts as a centralized service for tasks such as:

  • Configuration Management. Zookeeper enables the cluster member nodes to “bootstrap” their configuration from a central source.
  • Group Membership. It will manage the cluster’s node status in real time and coordinate nodes joining or leaving the cluster.
  • Naming Services and Registry. Zookeeper maintains a centralized and highly available registry where it maintain the central naming services.
  • Locking and Synchronization. Zookeeper will maintain any shared locks, barriers etc…
  • Leader Switch-Over.  Zookeeper maintains the shared state between partition leaders and followers, ensuring that a leader transition is seamless.

An example of how Kafka leverages Zookeeper is shown in the figure below:

Zookeeper

In this case the producer will first call onto Zookeeper to get the Kafka Broker address. As the Producers start streaming in messages, the broker maintains it state in Zookeeper. The consumers also track their individual message offsets through Zookeeper. This is actually done by consumer group instead of individual consumers, more about that later.

Producers

Producers public their data to the topics of their choice. The producer is responsible for choosing which message to assign to which partition within the topic. This can be done in two different ways:

  1. Publishers can publish to different topics in a round-robin fashion, thereby automatically balancing the load.
  2. Public can dynamically select the topics by means of a “semantic partition function”, which is typically based upon a key contained in the message.

 Consumers

In messaging we have typically recognized two core models:

  1. Queuing
  2. Publish-subscribe

In a queuing model, a pool of consumers can read from a queue, but a single message only goes to exactly one consumer. This model is shown in the figure below:

queus

In the above queuing system, we have 3 messages (in this case messages containing orders). Each message will be delivered to exactly one consumer. This model is typically used to achieve load balancing across a distributed system.

In a publish-subscribe model the message is broadcast to all subscribing consumers, as is shown below:

PublishSubscribe

With the concept of the Consumer Group Kafka offers a single consumer abstraction that generalizes queuing and publish-subscribe.

In this model, consumers label themselves with a consumer group name, and each message published to the topic is delivered to one consumer instance within each subscribing consumer group. Consumer instances can be in separate processes on separate machines, as is shown in the figure below:

ConsumerGroups

 

If all the consumer instances have the same consumer group, then this model works just like a traditional queue balancing log over the consumers.

On the other hand, if all the consumer instances have different consumer groups, then the model works like traditional publish-subscribe and all messages are broadcast to all consumers.

In most scenarios, a topic will have a small number of consumer groups, one for each logical subscriber. Each group is composed of many consumer instances for scalability and fault tolerance. Effectively, this enforces publish-subscribe semantics where the subscriber is a cluster of consumers instead of a single process.

Guarantees

The Kafka messaging system provides the following guarantees:

  • Messages sent by a producer to a particular topic partition will be appended in the order that they were sent. If a message Msg1 is sent by the same producer as message Msg2, and Msg1 is sent first, then Msg1 will have a lower offset that Msg2 and appear earlier in the log.
  • A consumer instance sees messages in the order they are stored in the log.
  • For a topic with replication factor N, Kafka will tolerate up to N-1 server failures without losing any messages committed to the log.

Use Cases

A number of popular use cases for this application are listed below:

  • Messaging. Kafka tends to work very well as a replacement for a more traditional message broker. In comparison to most messaging systems Kafka has better throughput, built-in partitioning, replication and inherent fault-tolerance which makes it a good fit for large scale message processing applications.
  • IOT Application Analytics. Kafka is a good fit as a message-ingestion point for an IOT Analytics sub-system. Telemetry events are routed to Kafka topics and are consumed downstream by a real-time analytics engine such as Apache Storm. Within this blog series we will perform a deep dive into this topic.
  • Log Aggregation. A number of applications use Kafka as a replacement for a log aggregation solution. Log aggregation typically collects physical log files from remote servers, and centralizes them in one location so they can be processed. Kafka provides  a clean abstraction of log data as a stream of messages instead of a traditional file. This enables lower-latency processing and easier support for multiple data sources and distributed data consumption. A Kafka-based system tends to have  lower latency and stronger delivery guarantees.
  • Metrics. Kafka is often used for operational monitoring data. This involves aggregating statistics from distributed applications to produce centralized feeds of operational data. Indeed, Kafka is a good fit for collecting “Telemetry about a system”, where log data from different instances of the application is aggregated into a HDFS-based back end.

What is next?

In the next post of this series, we will install Kafka on a “clean” Ubuntu distribution. We will create a number of topics, and run both a message consumer and producer.

 

Using the Hadoop HDFS Command Line

Introduction

The Hadoop File System (HDFS) includes an easy to use command line interface which you can use to manage the distributed file system.You will need access to a working Hadoop cluster in you want to follow along. If you don’t have direct access to a cluster you can create a local Hortonworks or Cloudera single-node cluster, as specified in my earlier post: getting started with Enterprise  Hadoop

Logging in to the cluster

Since we will be using the command line interface here, we need to be able to log into our cluster. In my case I am using the Hortonworks Sandbox VM, running in VirtualBox. On my local machine, I created the hostname hw_sandbox by adding the following entry to my C:\Windows\System32\drivers\etc\hosts file:

The generic syntax for using ssh is as follows:

This is what I use to log into my Hortonworks sandbox:

Note that, unlike me, you should not be using the root user  name if you are on a real cluster! Winking smile

Generic Command Structure

When you look for the right command line keywords you us, you might get different answers. Three different command lines as supported. They appear to be very similar but they have minute differences:

  1. hadoop fs <arguments>
  2. hadoop dfs <arguments>
  3. hdfs dfs <arguments>

Let’s take a look at the command structure for each one. First the hadoop fs command line:


FS relates to any generic file system which could be local, HDFS, DFTP, S3 FS etc.. Most of the Hadoop distributions are configured to default to HDFS  if you specify hadoop fs. You can explicitly specify HDFS in the hadoop command line, like so:


This explicitly specifies HDFS, and would work for any HDFS operations.However, this command has been deprecated. You should use the hdfs dfs syntax instead, as is shown below:


This syntax works for all operations against HDFS in it is the recommend command to use instead of hadoop fs or hadoop dfs. Indeed, if you use the hadoop dfs syntax, it will locate hdfs and delegate the command to hdfs dfs.

Creating a directory in HDFS

The syntax for creating a directory is shown below:

You can specify a path using two different syntax notations, as is show below:

  • You can use a standard local path (for example /user/bennie)
  • You can use the hdfs:// URL syntax (for example hdfs://sandbox.hortonworks.com/user/bennie).

Note: depending on the configuration of your cluster, you might have to specify a port in addition to the host name.

Below is an example of these commands running in the Hortonworks sandbox bash shell:

Uploading a file and Listing Contents

Let’s create a local file, we’ll use echo in this case:


You have a variety of commands available to upload a file to HDFS, I am using the put command here:


Next, we can list the contents of our directory with the ls command. Again, we can use the standard file system notation,  or we can use the hdfs:// notation, as is shown below:


Below is an example of these commands running in the shell:

Downloading a File from HDFS

We can use the get command to download a file from HDFS to our local file system:


Below is an example of these command running in the shell:

Getting Started with Enterprise Hadoop

So, you decided that you would like to learn Hadoop, but you have no idea where to get started? You could go to the Apache Hadoop download page, download the most recent distribution and install it on your system. But be forewarned, doing a local install directly from the apache distribution is not for the faint of heart, and can be extremely time consuming.

As easy way to get started with Big Data and Enterprise Hadoop is to leverage the sandbox virtual machine images offered by both Cloudera and Hortonworks. In this post, I will perform a quick walk through of both options.

Using the Hortonworks Sandbox

An easy way to get started with Big Data and Enterprise Hadoop is to leverage the Hortonworks Sandbox. The sandbox is a personal, portable Hadoop environment, containing the latest HDP distribution which runs in a virtual machine.

You have two deployment options with the Hortonworks Sandbox:

  1. You can deploy it as a standalone VM on your local computer.
  2. You can run the Sandbox as a cloud-based virtual machine.

I recommend using the first option if you have a more powerful local computer, ideally with 8 GB of RAM (or more, I would recommend at least 16 GB). The cloud-based option is more appropriate if you have a smaller system, or if you need access to the sandbox from multiple devices.

If you decide to install the sandbox locally, you have a choice of three different virtualization environments:

  • Oracle’s VirtualBox.
  • VMWare
  • Hyper-V on Windows

I deployed the sandbox on VirtualBox, and it was a very quick and easy process, although you do need to allow for some time to download the image from the Hortonworks Web site.

Using the Cloudera QuickStart VMs

The Cloudera Quickstart is very similar to the Hortonworks Sandbox offering. The QuickStart VMs contain a single-node Apache Hadoop cluster, complete with example data, queries scripts and Cloudera manager.

The Quickstart supports the following virtualization environments:

  • Oracle’s VirtualBox.
  • VMWare
  • KVM

I deployed the QuickStart on my local machine’s VirtualBox, and again the process was very straight forward.

Conclusion

Both the Hortonworks Sandbox and the Cloudera QuickStart are a great, painless way of getting started with Enterprise Hadoop. I noticed that I don’t just use them for learning, but also for experimenting with new projects and new features in a risk-free environment. I would definitely recommend giving either (or both) a spin. Which one you would want to use depends on your platform preferences. I tend to like a more “pure” Apache distribution without any proprietary addon’s so I tend to work more with the Hortonworks Sandbox.