Apache Kafka Part IV – Writing Producers

Introduction

In the previous posts in this series we provided a brief overview of Kafka, we provided an architectural overview and described how to install Kafka in multiple configurations on an Ubuntu Linux Distribution. A listing of these posts is provided below:

  1. Experiences with Apache Kafka provided us with an architectural overview of Kafka.
  2. In the Ubuntu installation post we installed Kafka in a single node, single broker configuration.
  3. In the Multiple Brokers post we investigated how to run a multi-broker Kafka installation.

In this post, we will finally get my hands dirty, and we will start writing Kafka producers, followed by consumers in later posts.

Writing Kafka Producers – An Overview

A Kafka producer is an application which creates messages and publishes these messages to a Kafka broker for downstream processing by a consumer. Kafka producers can be all types of applications, a few of which are shown below:

  • Internet of Things (IOT) applications. The edge devices (smart sensors and/or devices) of these systems generate a high volume of telemetry and command-and-control messages. These applications need a scalable, high-volume message ingestion infrastructure which can facilitate messaging rates often exceeding one millions messages per second.
  • Back-end services which need to collect activity stream and operational metrics. A typical example of this is LinkedIn, which powers its LinkedIn news feed, LinkedIn Today and offline analytics through Kafka.
  • Enterprise Distributed Messaging System scenarios. More and more, Kafka is being used as an alternative to traditional enterprise messaging systems such as RabbitMQ and ActiveMQ.

In this post we will write a Java-based Kafka producer running on Ubuntu. In later posts, we will focus on writing producers in different languages and running on diverse platforms, such as Python and Ruby on Linux, and C# running on a .Net platform.

The diagram below outlines the different steps involved in a typical Kafka Producer workflow:

KafkaProducerWorkflow

The producer connects to any of the alive nodes and requests metadata about the leaders for the partitions of a topic. This allows the producer to put the message directly to the lead broker for the partition. Remember, the leader is the node responsible for all reads and writes for a given topic partition.

The Kafka producer API exposes the interface for semantic partitioning by allowing the producer to specify a key to partition by and using this to hash the partition. This, the producer can completely control which partition it publishes messages to. For example, if Product ID is selected as a key, then all messages for a given product will be sent to the same partition. This also allows data consumers to make locality assumptions about customer data. Alternatively, a producer can choose to use a “round robin” way of distributing messages over the different partitions.

For high efficiency in Kafka, producers can also publish the messages in batches that work in asynchronous mode only. In asynchronous mode, the producer works either with a fixed number of messages of fixed latency defined by a producer configuration as follows:

  1. queue.time is used to define the latency.
  2. batch.size is used to define the message batch size

In this case, data is accumulated in memory at the producer’s end and published in batches in a single request. Asynchronous mode also brings the risk of losing the data in the case of a producer crash with accumulated, non-published, in-memory data.

In the next section , we will discuss the API provided by Kafka for writing Java-based custom producers.

The Java Producer API

Coming soon… post in progress..

 Creating the Eclipse Maven project skeleton

In this section, we will create the Producer skeleton project project in Eclipse as follows:

  • Start Eclipse. I am currently running the 3.8.1 version.
  • Go to File | New |  Other | Maven | Maven Project. If you don’t see the Maven option you probably don’t have the M2E Maven plug-in installed. In this case you first need to install the plug-in. A good link is the following Stack Overflow post: : http://stackoverflow.com/questions/8620127/maven-in-eclipse-step-by-step-installation
  • In the New Maven Project dialog, select Create a simple project as is shown below:

NewMavenProject

  • In the next screen, fill in the Group Id, Artifact Id and Version as is shown below:

NewMavenKafkaProject

  • Click Finish to create the project.

We now have a Maven project with a pom.xml file in your Eclipse workspace. In a terminal window, navigate to the project home directory and do a mvn clean install command to ensure that the build works as expected:

Kafka mvn clean install

Adding the Kafka dependencies to the pom.xml file

Next, we need to modify our pom.xml file to add the dependencies for Kafka. To get the latest Maven Kafka dependencies, use the following URL: http://mvnrepository.com/artifact/org.apache.kafka.

In my case, I selected the 0.8.2.1 version. Copy the XML snippet, and add it to your pom.xml file, as is shown below:

At this point, it is also a good idea to add the sl4j logging dependency, since we will be using it later. You can find this artifact at: http://mvnrepository.com/artifact/org.slf4j. I selected the 1.7.12 version. The updated dependencies section of our pom.xml now looks as follows:

At this point, it might again be a good idea to make sure that you have a working build by running mvn clean install in your terminal window.

We still need to fix up some items:

Add a properties section, containing a sourceEncoding property to get rid of a “platform  encoding” error:

We also need to make sure that we add a <packaging> node with a value of “jar”, as is shown below:

Finally, we need to set the JDK version. We have version 1.7 of the JDK installed on our image, so we can add a <build> section with the maven-compiler-plugin, and the JDK version set to 1.7:

We now have a clean, stable Maven project, enabling us to start focusing on our Kafka producer code in the next section.

Creating the Simple Producer

First, we need to to add a new Java package for our Producer. In the Eclipse Packager Explorer, right click the /src/main/java node and select New | Package. Enter com.bhaelen.blogpost.kafka.SimpleProducer as the package name and click Finish.

Next, add new class named SimpleProducer to the package. This will be the class that contains our main producer code.

Importing Classes

All Java Kafka producers need to import the classes listed below. Edit the SimpleProducer.java class, and add the following statements:

Note: You might have seen documentation that import the kafka.* packages, like so:

Before Kafka 0.8.2, kafka.javaapi.producer.Producer was the only official Java client (producer), and was implemented in Scala. From Kafka 0.8.2 onward, there is a new Java producer API: org.apache.kafka.client.producer.KafkaProducer, which is fully implemented in Java. This cllient is production tested and generally both faster and more fully featured than the previous Scala client.

Defining properties

Next, we need to define a set of properties which specify what brokers we will be using. We will be using the same 3 broker configuration as specified in the previous post. The complete property set is shown in the code below:

Following is a brief description of the code:

  • In lines 3 through 5 we set the BOOTSTRAP_SERVERS_CONFIG property. This property specifies the list of brokers (in the [<node:port>, <node: port>] format) that the producers needs to connect to. Kafka producers automatically determine the lead broker for the topic, partition it by raising a request for the metadata, and connect to the correct broker before publishing any message.
  • In lines 6 through 11 we set the VALUE_SERIALIZER_CLASS_CONFIG and the KEY_SERIALIZER_CLASS_CONFIG propreties, which indicate what serializer should be used while preparing the message for transmission from the producer to the broker. In our case we will use the StringEncoder class.
  • The ACKS_CONFIG property on lines 12-13 instructs the Kafka broker to send an acknowledgement to the producer when a message is received. The value of 1 means the producer receives an acknowledgement once the lead replica has received the data. This option provides better durability as the producer waits until the broker acknowledges the request as successful. By default, the producer works in the “fire and forget”  mode and is not informed in the case of message loss.

more coming soon….

Apache Kafka Part III – Multiple Brokers

Introduction

In my previous post, we installed Kafka in a single node, single broker configuration. In this post, we will explorer what it takes to run in a multi broker configuration. We will keep working in our Ubuntu environment, and we will leave our existing broker in place. We will add two additional broker instances on our local VM instance, which will result in the following configuration:

Kafka - Multiple Brokers

If you are continuing to follow along with my previous post I do recommend that you reboot your Ubuntu VM instance before going along with the example, this way we are sure that all ports are freed up.

Starting Zookeeper

The ZooKeeper starting procedure and configuration files remain the same, so you can start the ZooKeeper server as follows:

Starting the additional Kafka brokers

To get to three brokers, we need two additional instances on top of the one broker we already have defined with the config/server.properties file. Different server properties are required for each broker. Each property file needs to define unique, different values for the following properties:

  • broker.id – the unique ID of the broker instance
  • port – the port that the broker will be listening on.
  • log.dir – the directory for the broker’s log

The easiest way to implement this is the copy the existing config file into two new config files (one for each new broker):

Next, we need to edit both files and set the following properties:

config/server-1.properties:

config/server-2.properties:

We can follow a similar procedure for each new broker. Of course, if production environments, brokers instances will run on different machines, but this gives us a feel for what needs to be done.

We can now start each broker in a new terminal window.

Broker 1 (default broker):

Second broker:

And finally the third broker:

Creating a replicated topic

Since we now have three different brokers running, let’s create a new topic with a replication factor of three:

To check which broker is doing what we can use the describe topics command as follows:

The output of the describe topics command is shown below:

DescribeTopics

The first output line gives a summary off all partitions, showing topic name, partition count and the replication factor:

Each additional line gives information about one partition. Since we only have one partition for this topic there is only one line:

  • The Leader is the node responsible for all reads and writes for the given partition. Each node will be the leader for a randomly selected portion of the partitions. In our case we see that our last broker (with broker.id 2) is the leader.
  • The Replicas is the list of nodes that replicate the log for this partition regardless of whether they are the leader or even if they are currently alive. Here we see that all brokers replicate the topic.
  • Isr is the set of “in-sync” replicas. This is the subset of replicas that are currently alive and caught up to the leader. In this case we see that replicas are in sync.

Starting a producers to send  messages

Let’s publish a few messages to our new telemetry-replicated topic:

SendToReplicatedTopic

Note that we sent the messages to the first broker  with port number 9092

Consuming the messages

Now, let’s consume the published messages:

ConsumingMessagesSo, we see that everything still works as we expected it. Let’s make things a bit more interesting in the next section and test out our fault tolerance.

Testing Fault Tolerance

We know that the broker with ID = 2 was the leader. This is the last broker. Let’s go ahead and kill that instance. First we need to get the Linux process id of the instance:

since we used config/server-2.properties to start the broker we can grep the ps output on that. This produces the following output:

We need to use the second process id, since this is the real server process (the first is simple the shell script that started the process). Now that we have the process id we can kill it:

The full terminal output is shown below:

KillProcessFullOutput

Now, let’s do a describe topics again. This time we get the following output:

We see a few things now:

  • Leadership has switched to the first broker (with id = 0)
  • The in-sync replica set no longer contains the broker that was killed (with id = 2)

But because of the replicas of Kafka topics, the messages are still available even though the leader that took the writes originally was down. We can test this by starting the consumer again:

As we can see, all messages are still there and are ready to be consumed.

Conclusion

In this post, we created a single-node, multiple broker configuration to get a feel for how to work with a Kafka cluster. We learned that each broker has its own config file with a unique broker id, port and log directory.

We then created a replicated topic and published and consumed messages. We tested fault-tolerance by “shooting” our  leading broker, and saw that because of the replication, the published messages remain highly available, even if they were published to the killed leader.

In the next couple posts, we will start writing our own consumers. We’ll start out in java, and then move on to Python and .NET languages.

 

 

 

Apache Kakfa Part II – Ubuntu Installation

Introduction

In the first part of this section, I provided a general overview of Kafka. In this post, we get a bit more hands-on, and install Kafka on an Ubuntu virtual machine.

With Kakfa, we can create multiple types of clusters, such as the following:

  • A single node cluster with a single broker
  • A single node cluster with multiple brokers
  • Multiple nodes clusters with multiple brokers

We will start out implementing the first configuration (single node, single broker), and we will then migrate our setup to a single node, multiple brokers configuration.

Operating System

The configuration I used for this post is very simple:

  • My Host machine is a Dell laptop with 32 GB of memory.
  • I am using Oracle VirtualBox. The guest OS is Ubuntu 14.04 LTS.
  • I allocated 12 GB to the guest OS, which should be plenty for running Kafka in both single- and multi broker setups.

This post picks up right after the OS installation, I had not additional software installed at this time. As a first step, we will need to install Java, after which we can proceed to the Kafka install.

Installing Java

In this case we will install the Java SE edition, since the enterprise edition is not required for a Kafka installation. We will use apt-get for our installation.

First, we need to make sure that the package index is up to date:

Let’s make sure that java is not already installed by running java -version:

If this returns “The program java cannot be found in the following package…“, Java has not been installed yet, in which case we can install it as follows:

This will install the Java Runtime Environment (JRE). If you also need the Java Development Kit (JDK), which is usually needed to compile Java applications (for example Apache Ant, Apache Maven, Eclipse, IntelliJ etc..), then you can you issue the following command:

At this point, Java should be fully installed and ready to go. We need to ensure that we have our JAVA_HOME environment variable set correctly, which is covered in the next section.

Adding the JAVA_HOME environment variable

We want to  make sure that we set the JAVA_HOME variable in a persistent way, so we don’t have to reset it after a login or reboot. Follow these steps to set your JAVA_HOME environment variable:

  • Edit the /etc/environment file with your favorite editor
  • Add the JAVA_HOME statement and point it to the correct location. In this case we want to use /usr/bin, since this is the default installation location for apt-get:

JAVA_HOME

  • Use source to re-load the variables, as is shown in the figure below:

JAVA_HOME_consumed

At this point our Java installation is complete, and we are ready to download Kafka.

Downloading Kafka

To download Kafka, got to: http://kafka.apache.org/downloads.html. I downloaded version 0.8.2.1, with Scale Version 2.10, as is shown in the figure below:

DownloadKafka

On the next page, use the default default mirror site. Once we start the download, the file will be copied to your ~/Downloads directory.

We will install Kafka off our /opt directory. Follow these steps:

  • Go to the /opt directory:

  • Copy the kafka.tgz files from your download directory:

  • Unzip the files:

After this, the kafka directory will look as follows:

KafkaDirectory

Finally, we need to update the /etc/environment file to set the KAKFA_HOME path, and to add Kafka to the PATH environment variable itself:

KAFKA_HOME

Once we source the /etc/environment variable we should be ready to go:

KAFKA_HOME_Consumed

Setting up a Single Node, Single Broker Cluster

Introduction

In the previous section, we installed Kafka on a single machine. In this section we will setup a single node – single broker Kafka cluster, as is shown in the figure below:

SingleNodeKafkaCluster

Start ZooKeeper Service

Like I mentioned in the previous post, Kafka uses ZooKeeper, so  we first need to start  a ZooKeeper server.  The setting for ZooKeeper are maintained in the config/zookeeper.properties file. The more relevant parts of this file are shown below:

By default, the ZooKeeper server will listen on the 2181 TCP port. For detailed information on setting up ZooKeeper servers, please visit  http://zookeeper.apache.org. Start the ZooKeeper server as follows:

The ZooKeeper server will startup, as is shown in the figure below:

ZooKeeperStart

I recommend that you use different terminal windows, that way you can keep track of what is happening across the different servers that you will be starting as part of this post.

Start the Kafka Broker

Now we are ready to start the Kafka broker. The configuration properties of the broker are defined in the config/server.properties configuration file. The most relevant portions of this file are shown below:

Here, we see that the id of the broker is set to zero. All brokers in a cluster will be assigned sequential id’s starting with zero. The broker itself will listen on port 9092, and it will store its logs in the /tmp/kafka-logs directory. We also see that the connectivity string for ZooKeeper, using the same client port 2181.

To start the broker, launch a new terminal window, and issue the following command:

The broker will start, showing its output in the terminal window:

brokerStart

Creating a Kafka Topic

Kafka provides a command line utility named kafka-topcs.sh to create topics on the server. To create a topic named telemetry with a single partition and only one replica. Start a new terminal instance and issue the following command:

The kafka-topic.sh utility will create the topci, and show a successful creation message:

KafkaTopicCreated

In the broker terminal window, we can watch the topic being created:

BrokerTopicCreated

The log for the new topic has been created in /tmp/kafka-logs/ as was specified in the config/server.properties file:

KafkaLogsCreated

To get a list of topics and any Kafka server, we can use the following command in our terminal window:

The utility will output the name of our telemetry topic as is shown below:

KafkaListTopics

At this point, we have:

  1. Started the ZooKeeper server, which provides the coordination services for our Kafka Cluster.
  2. Started a single Kafka broker.
  3. Created a topic

We are now ready  to start sending and receiving messages. Kafka provides some out-of-the-box utilities which can act and message producers and consumers, let’s move on to those next.

Starting a Producer to send messages

Kafka provides users with a command line producer client that accepts input from the command line and publishes them as a message to the Kafka cluster. By default, each new line is published as a new message. Start (yet another) new Terminal window, and issue the following command to start the producer:

The following parameters are required for the producer command line client:

  • The broker list. This is the list of brokers that we want to send the messages to. In this case we only have one broker. The format in which each broker should be specified in <node_address:port>, which is our case is localhost:9092, since we know our broker is listening on port 9092, as specified in config/server.properties.
  • The second parameter is the name of the topic, which in our case is telemetry.

The producer will wait on input from stdin. Go ahead and type a few lines of messages, as is shown below:

KafkaProducer

The default properties for the producer are defined in the  config.producer.properties file. The most important properties are listed below:

The metadata.broker.list property allows us to specify the list of brokers. In our case we only have one broker at port 9092. At the moment, we are not using any compression, so we specify none as the compression codec.

In later posts, we will cover the details of writing producers for Kafka in detail.

Starting a consumer to consume messages

Kafka also provides a command line consumer client for message consumption. The following command is used to start a console-based consumer that shows the output in a terminal window as soon as it registers to the telemetry topic created in the Kafka broker:

When you used to producer in the previous section, you should get the following output:

KafkaConsumer

As one can see, the consumer will each the text typed in the producer. The default properties for the consumer are defined in the config/consumer.properties file. the most important properties are shown below:

We will cover consumer groups and the details of writing consumers in later posts.

Conclusion

By running all four components (zookeeper, broker, producer and consumers) in different terminals, we are able to enter messages from the producer’s terminal and see them appearing in the consumer’s terminal. We now have a very good understanding of the basics of Kafka.

In the next post, we will modify our configuration to use a single node – multiple broker setup.

 

 

 

Experiences with Apache Kafka – Part I

Introduction

This is the first post in a series which will focus on Apache Kafka and how it can be leveraged in a Big Data environment. Kafka is typically used as the primary message ingestion point for a Hadoop streaming application, where it is often paired with Apache Storm to perform real-time event processing.

Kafka Overview

Kafka is an

  • Open source
  • Distributed
  • Partitioned
  • Replicated
  • Commit-log-based

publish-subscribe messaging system.

For the remainder of this post, I would like to establish the following standard terminology to define the components that make up a tyupical Kafka messaging system:

  • The key abstraction in Kafka is the Topic. At the implementation level, a Kafka topic is just sharded write-ahead log. Topics are partitioned and each partition is represented by an ordered, immutable sequence of messages.
  • Processes that publish messages to a Kafka topic are referred to as Producers. Producers publish data to a topic by choosing the appropriate partition within the topic.
  • Processes that subscribe to topics and process the published messages are called Consumers.
  • Kafka is run as a cluster comprised of one or more servers each of which is called a Broker. Topics are created within the context of broker processes.

These core concepts are illustrated in the figure below:

Kafka

 

 

 

 

 

Communication between the clients and the servers is performed with a simple, high-performance, language agnostic TCP protocol. As part of this blog series, we will write a number of Java, .NET C#, and Python clients.

Topics

A topic can be seen as a category, stream or feed to which a producer can publish messages. Consumers can read the published messages from the specified topics. This is illustrated in the figure below:

Topic Overview

 

 

 

 

 

 

 

For each topic the Kafka cluster maintains a partitioned log which looks like the following image:

topics

A partition is nothing more than an immutable, ordered sequence of messages. As messages are published the commit log will be updated. Each message in a partition is assigned a sequential ID number which uniquely identifies each message within the partition. This sequence number is often referred to as the message offset.

All messages in a Kafka cluster are being retained  for a configurable amount of time, independent of the fact that they have been consumed by any consumer. In this context, Kafka is similar to Azure’s Event Hub messaging service. So, for example, if the messages retention policy is set to five days, then for the first five days after a message is published it will remain available for consumption. After the retention window expires, the message will be discarded to free up space. The performance of Kafka is relatively constant with respect to partition sizes, so retaining a lot of data will not negatively affect overall performance.

The only metadata retained by the system on a per-consumer basis is the position of the consumer in the log, called the “offset”. This offset is controlled by the consumers; normally the consumer will advance its offset linearly as it reads messages, but since the consumer is in control, it can consume the messages in any preferred order. For example, a consumer can reset to an older offset it it wishes to re-process a batch of messages.

This also implies that Kafka consumers are very lightweight, and have very little impact on the cluster’s overall performance. Since each consumer maintains its own offset, it has no ability to impact other consumers.

Partitions within a topic really serve two distinct purposes:

  1. The allow a topic to scale beyond a size that will fit on a single server. Each individual partition must fit on the servers that host it, but a topic may have many partitions, so it can handle an arbitrary amount of data.
  2. Partitions act as the primary unit of parallelism in Kafka. They allow  a single consumer to concurrently read messages in multiple concurrent threads. I will elaborate on this subject in detail in a later post.

The following image summarizes our insights on topics and partitions:

PartitionsSummary

Partition Distribution

The partitions of a topic are distributed over the servers in the Kafka clusters with each server handling data and request for a share of the partitions. Each partition is replicated across a configurable number of servers for fault tolerance.

Each partition has one server which acts as the “leader” and zero on more servers which act as “followers”. The leader handles all read and write requests for the partition while the followers passively replicate the leader. If the leader fails, one of the followers will automatically become the new leader.  A follower acts as a normal consumer, it pulls messages and updates it own log. When a follower rejoins a partition it first re-syncs all messages, after which it can become fully online.

The leader uses the Zookeeper heartbeat mechanism to indicate that it is alive and well. Each server acts as a leader some of its partitions and a follower for others so load is well balanced within the cluster.

The basic principles of replication are summarized in the figure below:

Replication

Zookeeper

Another important component of a Kafka infrastructure is Zookeeper. While most readers will know Zookeeper from the larger Hadoop ecosystem, it is also used in stand-alone Kafka deployments.

Zookeeper servers as the coordination interface between the Kafka brokers and consumers. At the core, Zookeeper is an open source, High Performance coordination service for distributed applications. Within the Apache Hadoop ecosystem it acts as a centralized service for tasks such as:

  • Configuration Management. Zookeeper enables the cluster member nodes to “bootstrap” their configuration from a central source.
  • Group Membership. It will manage the cluster’s node status in real time and coordinate nodes joining or leaving the cluster.
  • Naming Services and Registry. Zookeeper maintains a centralized and highly available registry where it maintain the central naming services.
  • Locking and Synchronization. Zookeeper will maintain any shared locks, barriers etc…
  • Leader Switch-Over.  Zookeeper maintains the shared state between partition leaders and followers, ensuring that a leader transition is seamless.

An example of how Kafka leverages Zookeeper is shown in the figure below:

Zookeeper

In this case the producer will first call onto Zookeeper to get the Kafka Broker address. As the Producers start streaming in messages, the broker maintains it state in Zookeeper. The consumers also track their individual message offsets through Zookeeper. This is actually done by consumer group instead of individual consumers, more about that later.

Producers

Producers public their data to the topics of their choice. The producer is responsible for choosing which message to assign to which partition within the topic. This can be done in two different ways:

  1. Publishers can publish to different topics in a round-robin fashion, thereby automatically balancing the load.
  2. Public can dynamically select the topics by means of a “semantic partition function”, which is typically based upon a key contained in the message.

 Consumers

In messaging we have typically recognized two core models:

  1. Queuing
  2. Publish-subscribe

In a queuing model, a pool of consumers can read from a queue, but a single message only goes to exactly one consumer. This model is shown in the figure below:

queus

In the above queuing system, we have 3 messages (in this case messages containing orders). Each message will be delivered to exactly one consumer. This model is typically used to achieve load balancing across a distributed system.

In a publish-subscribe model the message is broadcast to all subscribing consumers, as is shown below:

PublishSubscribe

With the concept of the Consumer Group Kafka offers a single consumer abstraction that generalizes queuing and publish-subscribe.

In this model, consumers label themselves with a consumer group name, and each message published to the topic is delivered to one consumer instance within each subscribing consumer group. Consumer instances can be in separate processes on separate machines, as is shown in the figure below:

ConsumerGroups

 

If all the consumer instances have the same consumer group, then this model works just like a traditional queue balancing log over the consumers.

On the other hand, if all the consumer instances have different consumer groups, then the model works like traditional publish-subscribe and all messages are broadcast to all consumers.

In most scenarios, a topic will have a small number of consumer groups, one for each logical subscriber. Each group is composed of many consumer instances for scalability and fault tolerance. Effectively, this enforces publish-subscribe semantics where the subscriber is a cluster of consumers instead of a single process.

Guarantees

The Kafka messaging system provides the following guarantees:

  • Messages sent by a producer to a particular topic partition will be appended in the order that they were sent. If a message Msg1 is sent by the same producer as message Msg2, and Msg1 is sent first, then Msg1 will have a lower offset that Msg2 and appear earlier in the log.
  • A consumer instance sees messages in the order they are stored in the log.
  • For a topic with replication factor N, Kafka will tolerate up to N-1 server failures without losing any messages committed to the log.

Use Cases

A number of popular use cases for this application are listed below:

  • Messaging. Kafka tends to work very well as a replacement for a more traditional message broker. In comparison to most messaging systems Kafka has better throughput, built-in partitioning, replication and inherent fault-tolerance which makes it a good fit for large scale message processing applications.
  • IOT Application Analytics. Kafka is a good fit as a message-ingestion point for an IOT Analytics sub-system. Telemetry events are routed to Kafka topics and are consumed downstream by a real-time analytics engine such as Apache Storm. Within this blog series we will perform a deep dive into this topic.
  • Log Aggregation. A number of applications use Kafka as a replacement for a log aggregation solution. Log aggregation typically collects physical log files from remote servers, and centralizes them in one location so they can be processed. Kafka provides  a clean abstraction of log data as a stream of messages instead of a traditional file. This enables lower-latency processing and easier support for multiple data sources and distributed data consumption. A Kafka-based system tends to have  lower latency and stronger delivery guarantees.
  • Metrics. Kafka is often used for operational monitoring data. This involves aggregating statistics from distributed applications to produce centralized feeds of operational data. Indeed, Kafka is a good fit for collecting “Telemetry about a system”, where log data from different instances of the application is aggregated into a HDFS-based back end.

What is next?

In the next post of this series, we will install Kafka on a “clean” Ubuntu distribution. We will create a number of topics, and run both a message consumer and producer.

 

PowerShell Record Extractor

Introduction

When you work on Big Data projects, you tend to work with very big files. In my day job, I frequently deal with CSV files that are larger that 500 MB. Even on my Dell workstation, it takes a long time to open such a file in Excel or any text editor, such as Notepad++. During development you typically don’t want to work with files of this size, indeed in a lot of scenarios it is sufficient to work with just the N top records.

I found myself frequently trying to use the PowerShell Get-Content or ImportCsv cmdlets, but even those don’t handle large files well, especially since they still try to read the entire file.  In addition, cmdlets such as Import-Csv assume a given format (in this case comma-separated).

Getting a bit frustrated with what was out there, I decided to quickly write my own little PowerShell script, which is described below.

The ExtractRecords Script

Requirements

The requirements for my custom script were very simple and straight-forward:

  1. The script should work with text files and read and write a line at the time.
  2. The code should not care about any text formats (CSV, tab-delimited etc..).
  3. The parameters to the script should be the following:
    • The first parameter should allow me to specify the input file name.
    • The second parameter specifies the output file name.
    • The next parameter should specify the number of records to extract. This parameter should default to 1000.
    • The final parameter should allow me to optionally skip the first (header record). In some cases, my files do have header records, while in other scenarios there is no header record, so the script should be able to handle both.
  4. Most of all, the script should be fast, very fast! Did I mention that it should be fast? In our projects we already have a enough work to do without having to be slowed down by mundane tasks like extracting a test data set.

The script file is named ExtractRecords.ps1 and it can be found at my GitHub repository:

Usage

In these examples I will be working with a CSV file named BigFile.csv, which is about 458 MB in size.  In this first example, let’s assume that we would like to extract the first 5000 records into  a file named Extract.csv:

A sample run in PowerShell is shown below:

By default, the script will NOT include the first (header) record. If you do want this first header row included, you can run the script as follows:

Performance

The script is indeed very fast. Below is a sample run where I write 100,000 records. The script takes just a bit over half a second to read and write 100,000 lines. To measure performance I used the Measure-Command cmdlet, which is really handy for these type of tasks:

 

Script Analysis

In this section we take a quick look at how the script is structured. I am by no means a PowerShell specialist, no any feedback is always welcome.

I use script parameters is a param block. I specify the parameters, and write out their values as information to the user:

Next, I make sure that the input and output file names were specified:


I convert the specified paths into “fully qualified paths”. The PSCmdLet.GetUnresolvedProviderPathFromPSPath is a great method to get the fully-qualified path:

Next, I need to make sure that the file specified by the full input path does exist:

We use the StreamReader and StreamWriter in the .NET System.IO namespace. This is really only of the fastest way to read and write files. The remainder of the script is shown below:

 

Using the Hadoop HDFS Command Line

Introduction

The Hadoop File System (HDFS) includes an easy to use command line interface which you can use to manage the distributed file system.You will need access to a working Hadoop cluster in you want to follow along. If you don’t have direct access to a cluster you can create a local Hortonworks or Cloudera single-node cluster, as specified in my earlier post: getting started with Enterprise  Hadoop

Logging in to the cluster

Since we will be using the command line interface here, we need to be able to log into our cluster. In my case I am using the Hortonworks Sandbox VM, running in VirtualBox. On my local machine, I created the hostname hw_sandbox by adding the following entry to my C:\Windows\System32\drivers\etc\hosts file:

The generic syntax for using ssh is as follows:

This is what I use to log into my Hortonworks sandbox:

Note that, unlike me, you should not be using the root user  name if you are on a real cluster! Winking smile

Generic Command Structure

When you look for the right command line keywords you us, you might get different answers. Three different command lines as supported. They appear to be very similar but they have minute differences:

  1. hadoop fs <arguments>
  2. hadoop dfs <arguments>
  3. hdfs dfs <arguments>

Let’s take a look at the command structure for each one. First the hadoop fs command line:


FS relates to any generic file system which could be local, HDFS, DFTP, S3 FS etc.. Most of the Hadoop distributions are configured to default to HDFS  if you specify hadoop fs. You can explicitly specify HDFS in the hadoop command line, like so:


This explicitly specifies HDFS, and would work for any HDFS operations.However, this command has been deprecated. You should use the hdfs dfs syntax instead, as is shown below:


This syntax works for all operations against HDFS in it is the recommend command to use instead of hadoop fs or hadoop dfs. Indeed, if you use the hadoop dfs syntax, it will locate hdfs and delegate the command to hdfs dfs.

Creating a directory in HDFS

The syntax for creating a directory is shown below:

You can specify a path using two different syntax notations, as is show below:

  • You can use a standard local path (for example /user/bennie)
  • You can use the hdfs:// URL syntax (for example hdfs://sandbox.hortonworks.com/user/bennie).

Note: depending on the configuration of your cluster, you might have to specify a port in addition to the host name.

Below is an example of these commands running in the Hortonworks sandbox bash shell:

Uploading a file and Listing Contents

Let’s create a local file, we’ll use echo in this case:


You have a variety of commands available to upload a file to HDFS, I am using the put command here:


Next, we can list the contents of our directory with the ls command. Again, we can use the standard file system notation,  or we can use the hdfs:// notation, as is shown below:


Below is an example of these commands running in the shell:

Downloading a File from HDFS

We can use the get command to download a file from HDFS to our local file system:


Below is an example of these command running in the shell:

Getting Started with Enterprise Hadoop

So, you decided that you would like to learn Hadoop, but you have no idea where to get started? You could go to the Apache Hadoop download page, download the most recent distribution and install it on your system. But be forewarned, doing a local install directly from the apache distribution is not for the faint of heart, and can be extremely time consuming.

As easy way to get started with Big Data and Enterprise Hadoop is to leverage the sandbox virtual machine images offered by both Cloudera and Hortonworks. In this post, I will perform a quick walk through of both options.

Using the Hortonworks Sandbox

An easy way to get started with Big Data and Enterprise Hadoop is to leverage the Hortonworks Sandbox. The sandbox is a personal, portable Hadoop environment, containing the latest HDP distribution which runs in a virtual machine.

You have two deployment options with the Hortonworks Sandbox:

  1. You can deploy it as a standalone VM on your local computer.
  2. You can run the Sandbox as a cloud-based virtual machine.

I recommend using the first option if you have a more powerful local computer, ideally with 8 GB of RAM (or more, I would recommend at least 16 GB). The cloud-based option is more appropriate if you have a smaller system, or if you need access to the sandbox from multiple devices.

If you decide to install the sandbox locally, you have a choice of three different virtualization environments:

  • Oracle’s VirtualBox.
  • VMWare
  • Hyper-V on Windows

I deployed the sandbox on VirtualBox, and it was a very quick and easy process, although you do need to allow for some time to download the image from the Hortonworks Web site.

Using the Cloudera QuickStart VMs

The Cloudera Quickstart is very similar to the Hortonworks Sandbox offering. The QuickStart VMs contain a single-node Apache Hadoop cluster, complete with example data, queries scripts and Cloudera manager.

The Quickstart supports the following virtualization environments:

  • Oracle’s VirtualBox.
  • VMWare
  • KVM

I deployed the QuickStart on my local machine’s VirtualBox, and again the process was very straight forward.

Conclusion

Both the Hortonworks Sandbox and the Cloudera QuickStart are a great, painless way of getting started with Enterprise Hadoop. I noticed that I don’t just use them for learning, but also for experimenting with new projects and new features in a risk-free environment. I would definitely recommend giving either (or both) a spin. Which one you would want to use depends on your platform preferences. I tend to like a more “pure” Apache distribution without any proprietary addon’s so I tend to work more with the Hortonworks Sandbox.