Install Apache Kafka on Mac. Download the latest Apache Kafka from under Binary downloads. Click on any of the binary downloads, or choose a specific scala version if you have any dependency with scala in your development. Go with the recommended mirror site. Extract the contents. On my shell, it looks like this: /tools/openshift-origin-client-tools-v3.11.0-0cbc58b-mac/oc. Once installed, the OpenShift installer and OpenShift command line utility (oc) are ready for use. Confluent Operator 5.5. Download Confluent Operator and unzip the package. Version names of Apache Kafka vs. Kafka in Confluent Platform: Confluent always contributes patches back to the Apache Kafka® open source project. However, the exact versions (and version names) being included in Confluent Platform may differ from the Apache artifacts when Confluent Platform and Kafka releases do not align.
This topic provides instructions for installing a production-ready Confluent Platform configuration in a multi-node RHEL or CentOSenvironment with a replicated ZooKeeper ensemble.
The YUM repositories provide packages for RHEL, CentOS, and Fedora-based distributions.
Important
You must complete these steps for each node in your cluster.
- Prerequisites
- Before installing Confluent Platform, your environment must have the following software and hardware requirements.
Get the Software¶
The YUM repositories provide packages for RHEL, CentOS, and Fedora-based distributions. You can install individual Confluent Platformpackages or the entire platform. For a list of available packages, see the documentationor you can search the repository (
yumsearch<package-name>
).- Install the
curl
andwhich
tools. - Install the Confluent Platform public key. This key is used to sign packages in the YUM repository.
- Navigate to
/etc/yum.repos.d/
and create a file namedconfluent.repo
with these contents. This adds the Confluentrepositories. You must have the entries for both repositories,[Confluent.dist]
and[Confluent]
, as shown below. - Clear the YUM cachesand install Confluent Platform.
- Confluent Platform:
- Confluent Platform with RBAC:
- Confluent Platform using only Confluent Community components:
Configure Confluent Platform¶
Confluent Kafka Wikipedia
Tip
You can store passwords and other configuration data securely by using theconfluent secret commands. For more information see Secrets.
Configure Confluent Platform with the individual component properties files. By default these are located in
<path-to-confluent>/etc/
.You must minimally configure the following components.ZooKeeper¶
These instructions assume you are running ZooKeeper in replicated mode. A minimum of three servers are required for replicatedmode, and you must have an odd number of servers for failover. For more information, see theZooKeeper documentation.
- Navigate to the ZooKeeper properties file (
/etc/kafka/zookeeper.properties
) file and modify as shown.This configuration is for a three node ensemble. This configuration file should be identical across all nodes in the ensemble.tickTime
,dataDir
, andclientPort
are all set to typical single server values. TheinitLimit
andsyncLimit
govern how long following ZooKeeper servers can take to initialize with the current leader and how long they can be out of syncwith the leader. In this configuration, a follower can take 10000 ms to initialize and can be out of sync for up to 4000 msbased on thetickTime
being set to 2000ms.Theserver.*
properties set the ensemble membership. The format ismyid
is the server identification number. There are three servers that each have a differentmyid
with values1
,2
, and3
respectively. Themyid
is set by creating a file namedmyid
in thedataDir
that contains a single integer in human readable ASCII text. This value must match one of themyid
values from theconfiguration file. You will see an error if another ensemble member is already started with a conflictingmyid
value.leaderport
is used by followers to connect to the active leader. This port should be open between all ZooKeeperensemble members.electionport
is used to perform leader elections between ensemble members. This port should be open between all ZooKeeperensemble members.
Theautopurge.snapRetainCount
andautopurge.purgeInterval
have been set to purge all but three snapshots every 24 hours. - Navigate to the ZooKeeper log directory (e.g.,
/var/lib/zookeeper/
) and create a file namedmyid
. Themyid
fileconsists of a single line that contains the machine ID in the format<machine-id>
. When the ZooKeeper server starts up, it knowswhich server it is by referencing themyid
file. For example, server 1 will have amyid
value of1
.
Kafka¶
In a production environment, multiple brokers are required. During startup brokers register themselves in ZooKeeper to becomea member of the cluster.
Navigate to the Apache Kafka® properties file (
/etc/kafka/server.properties
) and customize the following:- Connect to the same ZooKeeper ensemble by setting the
zookeeper.connect
in all nodes to the same value. Replace allinstances oflocalhost
to the hostname or FQDN (fully qualified domain name) of your node. For example, if yourhostname iszookeeper
: - Configure the broker IDs for each node in your cluster using one of these methods.
- Dynamically generate the broker IDs: add
broker.id.generation.enable=true
and comment outbroker.id
. For example: - Manually set the broker IDs: set a unique value for
broker.id
on each node.
- Configure how other brokers and clients communicate with the broker using
listeners
, and optionallyadvertised.listeners
.listeners
: Comma-separated list of URIs and listener names to listen on.advertised.listeners
: Comma-separated list of URIs and listener names for other brokers and clients to use.Theadvertised.listeners
parameter ensures that the broker advertises an address that is accessible from bothlocal and external hosts.
For more information, see Production Configuration Options. - Configure security for your environment.
- For general security guidance, see Security.
- For role-based access control (RBAC), see Configure Metadata Service (MDS).
- For SSL encryption, SASL authentication, and authorization, see Security Tutorial.
Control Center¶
![Windows Windows](/uploads/1/2/6/4/126428453/383979777.jpeg)
- Navigate to the Control Center properties file (
/etc/confluent-control-center/control-center-production.properties
)and customize the following:This configuration is for a three node multi-node cluster. For more information, see Control Center configuration details. For information about Confluent Platform licenses, see Managing Confluent Platform Licenses. - Navigate to the Kafka server configuration file (
/etc/kafka/server.properties
) and enable Confluent Metrics Reporter. - Add these lines to the Kafka Connect properties file (
/etc/kafka/connect-distributed.properties
) to add support for theinterceptors.
Confluent REST Proxy¶
Navigate to the Confluent REST Proxy properties file (
/etc/kafka-rest/kafka-rest.properties
) and customize the following:- Optionally configure
zookeeper.connect
. ZooKeeper connectivity is needed for the earlier /v1/ consumer endpoints.Changelocalhost
to the hostname or FQDN (fully qualified domain name) of your node. For example, if your hostname iszookeeper
:
Schema Registry¶
Navigate to the Schema Registry properties file (
/etc/schema-registry/schema-registry.properties
)and specify the following properties:This configuration is for a three node multi-node cluster. For more information, see Running Schema Registry in Production.
Start Confluent Platform¶
Start Confluent Platform and its components using systemd service unit files. You can start immediately by using the
systemctlstart
command or enable for automatic startup by using the systemctlenable
command. These instructionsuse the syntax for immediate startup.Tip
ZooKeeper, Kafka, and Schema Registry must be started in this specific order, and must be started before any other components.
- Start ZooKeeper.
- Start Kafka.
- Confluent Platform:
- Confluent Platform using only Confluent Community components:
- Start Schema Registry.
- Start other Confluent Platform components as desired.
- Control Center
- Kafka Connect
- Confluent REST Proxy
- ksqlDB
Tip
You can check service status with this command:
systemctlstatusconfluent*
. For more informationabout the systemd service unit files, see Using Confluent Platform systemd Service Unit Files.Uninstall¶
Run this command to remove Confluent Platform, where
<component-name>
is either confluent-platform
(Confluent Platform) or confluent-community-2.13
(Confluent Platform using only Confluent Community components).For example, run this command to remove Confluent Platform:
Next Steps¶
Try out the Apache Kafka Quick Start.
Confluent Platform includes the Java producer and consumer shipped with Apache Kafka®.
Java Client installation¶
All JARs included in the packages are also available in the Confluent Mavenrepository. Here’s a sample POM file showing how to add this repository:
The Confluent Maven repository includes compiled versions of Kafka.
To reference the Kafka version 2.6 that is included with Confluent Platform 6.0.0,use the following in your
pom.xml
:Note
![Download Download](/uploads/1/2/6/4/126428453/636348130.png)
Version names of Apache Kafka vs. Kafka in Confluent Platform:Confluent always contributes patches back to the Apache Kafka® open source project.However, the exact versions (and version names) being included in Confluent Platformmay differ from the Apache artifacts when Confluent Platform and Kafkareleases do not align. If they are different, Confluent keeps the
groupId
and artifactId
identical, but appends the suffix -ccs
to the version identifierof the Confluent Platform version to distinguish these from the Apache artifacts.You can reference artifacts for all Java libraries that are included with Confluent Platform. For example, to use theAvro serializer you can include the following in your
pom.xml
:Tip
You can also specify
kafka-protobuf-serializer
or kafka-jsonschema-serializer
serializers. For more information, see Schema Formats, Serializers, and Deserializers.Java Client example code¶
For Hello World examples of Kafka clients in Java, see Java.All examples include a producer and consumer that can connect to any Kafka cluster running on-premises or in Confluent Cloud.They also include examples of how to produce and consume Avro data with Schema Registry.
Kafka Producer¶
Initialization¶
The Java producer is constructed with a standard
Properties
file.Configuration errors will result in a raised
KafkaException
fromthe constructor of KafkaProducer
.Asynchronous writes¶
The Java producer includes a
send()
API which returns a future which can be polled to get the result of the send.This producer example shows how to invoke some code after the write has completed you can alsoprovide a callback. In Java this is implemented as a
Callback
object:In the Java implementation you should avoid doing any expensive work inthis callback since it is executed in the producer’s IO thread.
Synchronous writes¶
Kafka Consumer¶
Initialization¶
The Java consumer is constructed with a standard
Properties
file.Configuration errors will result in a
KafkaException
raised fromthe constructor of KafkaConsumer
.Basic usage¶
The Java client is designed around an event loop which is driven bythe
poll()
API. This design is motivated by the UNIX select
and poll
system calls. A basic consumption loop with the Java APIusually takes the following form:There is no background thread in the Java consumer. The API depends oncalls to
poll()
to drive all of its IO including:- Joining the consumer group and handling partition rebalances.
- Sending periodic heartbeats if part of an active generation.
- Sending periodic offset commits (if autocommit is enabled).
- Sending and receiving fetch requests for assigned partitions.
Due to this single-threaded model, no heartbeats can be sent whilethe application is handling the records returned from a call to
poll()
.This means that the consumer will fall out of the consumer group if either the event loopterminates or if a delay in record processing causes the sessiontimeout to expire before the next iteration of the loop. This isactually by design. One of the problems that the Java client attemptsto solve is ensuring the liveness of consumers in the group. As longas the consumer is assigned partitions, no other members in the groupcan consume from the same partitions, so it is important to ensurethat it is actually making progress and has not become a zombie.This feature protects your application from a large class of failures,but the downside is that it puts the burden on you to tune the sessiontimeout so that the consumer does not exceed it in its normal recordprocessing. The
max.poll.records
configuration option places an upper bound on the number ofrecords returned from each call. You should use both poll()
and max.poll.records
with a fairly highsession timeout (e.g. 30 to 60 seconds), and keeping the number ofrecords processed on each iteration bounded so that worst-casebehavior is predictable.If you fail to tune these settings appropriately, the consequence istypically a
CommitFailedException
raised from the call to commitoffsets for the processed records. If you are using the automaticcommit policy, then you might not even notice when this happens sincethe consumer silently ignores commit failures internally (unless it’soccurring often enough to impact lag metrics). You can catch thisexception and either ignore it or perform any needed rollback logic.Java Client code examples¶
Confluent Kafka Tutorial
Basic poll loop¶
The consumer API is centered around the
poll()
method, which isused to retrieve records from the brokers. The subscribe()
methodcontrols which topics will be fetched in poll. Typically, consumerusage involves an initial call to subscribe()
to setup the topicsof interest and then a loop which calls poll()
until theapplication is shut down.The consumer intentionally avoids a specific threading model. It isnot safe for multi-threaded access and it has no background threads ofits own. In particular, this means that all IO occurs in the threadcalling
poll()
. In the consumer example below, the poll loop is wrapped in aRunnable
which makes it easy to use with an ExecutorService
.The poll timeout is hard-coded to 500 milliseconds. If no recordsare received before this timeout expires, then
poll()
will returnan empty record set. It’s not a bad idea to add a shortcut check forthis case if your message processing involves any setup overhead.Confluent Kafka C# Example
To shut down the consumer, a flag is added which is checked on eachloop iteration. After shutdown is triggered, the consumer will wait atmost 500 milliseconds (plus the message processing time) beforeshutting down since it might be triggered while it is in
poll()
.A better approach is provided in the next example.Note that you should always call
close()
after you are finishedusing the consumer. Doing so will ensure that active sockets areclosed and internal state is cleaned up. It will also trigger a grouprebalance immediately which ensures that any partitions owned by theconsumer are re-assigned to another member in the group. If not closedproperly, the broker will trigger the rebalance only after the sessiontimeout has expired. Latch is added to this example to ensurethat the consumer has time to finish closing before finishingshutdown.Shutdown with wakeup¶
An alternative pattern for the poll loop in the Java consumer is touse
Long.MAX_VALUE
for the timeout. To break from the loop, you canuse the consumer’s wakeup()
method from a separate thread. Thiswill raise a WakeupException
from the thread blocking inpoll()
. If the thread is not currently blocking, then this willwakeup the next poll invocation.Synchronous commits¶
Confluent Kafka Version
The simplest and mostreliable way to manually commit offsets is using a synchronous commitwith
commitSync()
. As its name suggests, this method blocks untilthe commit has completed successfully.In this example, a try/catch block is added around the call to
commitSync
. The CommitFailedException
is thrown when thecommit cannot be completed because the group has been rebalanced. Thisis the main thing to be careful of when using the Javaclient. Since all network IO (including heartbeating) and messageprocessing is done in the foreground, it is possible for the sessiontimeout to expire while a batch of messages is being processed. Tohandle this, you have two choices.First you can adjust the
session.timeout.ms
setting to ensure thatthe handler has enough time to finish processing messages. You canthen tune max.partition.fetch.bytes
to limit the amount of datareturned in a single batch, though you will have to consider how manypartitions are in the subscribed topics.The second option is to do message processing in a separate thread,but you will have to manage flow control to ensure that the threadscan keep up. For example, just pushing messages into a blocking queuewould probably not be sufficient unless the rate of processing cankeep up with the rate of delivery (in which case you might not need aseparate thread anway). It may even exacerbate the problem if the pollloop is stuck blocking on a call to
offer()
while the backgroundthread is handling an even larger batch of messages. The Java APIoffers a pause()
method to help in these situations.For now, you should set
session.timeout.ms
large enough thatcommit failures from rebalances are rare. As mentioned above, the onlydrawback to this is a longer delay before partitions can bere-assigned in the event of a hard failure (where the consumer cannotbe cleanly shut down with close()
). This should be rare inpractice.You should be careful in this example sincethe
wakeup()
might be triggered while the commit is pending. Therecursive call is safe since the wakeup will only be triggered once.Delivery guarantees¶
In the previous example, you get “at least once”delivery sincethe commit follows the message processing. By changing the order,however, you can get “at most once” delivery. But you must be alittle careful with the commit failure, so you should change
doCommitSync
to return whether or not the commit succeeded. There’s also no longerany need to catch the WakeupException
in the synchronous commit.Correct offset management is crucial because it affects deliverysemantics.
Asynchronous commits¶
The API gives you a callback which is invokedwhen the commit either succeeds or fails:
In the example below, synchronous commits are incorporated on rebalances and on close.For this, the
subscribe()
method has a variant which accepts aConsumerRebalanceListener
, which has two methods to hook intorebalance behavior.Suggested Reading¶
Blog post: Multi-Threaded Message Consumption with the Apache Kafka Consumer