• Home
  • LLMs
  • Docker
  • Kubernetes
  • Java
  • Maven
  • About
Big Data | Install and configure Apache Kafka (3.4.0)
  1. Prerequisites
  2. Install Kafka
  3. How to configure Kafka?
  4. How to start/stop Kafka?
  5. How to install and configure a Kafka cluster on the same host?
  6. Common Kafka startup errors
  7. Cleanup Kafka
  8. How to create topics?
  9. How to monitor topics?
  10. How to delete topics?
  11. How to send (produce) a message to Kafka?
  12. How to send a very large file to Kafka?
  13. How to receive (consume) a message from kafka?
  14. How to monitor consumer groups?
  15. How to get an entity configuration?
  16. How to add/delete an entity configuration?
  17. Kafka Tool

  1. Prerequisites
    Zookeeper is a prerequisite and should be up and running before starting Kafka.
    See this page for more details on how to install Zookeeper: Apache ZooKeeper
  2. Install Kafka
    Download Apache Kafka: https://kafka.apache.org/downloads

    Extract the file "kafka_2.13-3.4.0.tgz" in the folder you want to install Kafka: /opt/kafka_2.13-3.4.0
    $ tar -xf ~/Downloads/kafka_2.13-3.4.0.tgz -C /opt/

    Note: In the following sections, the environment variable ${KAFKA_HOME} will refer to this location "/opt/kafka_2.13-3.4.0"

    $ export KAFKA_HOME=/opt/kafka_2.13-3.4.0
  3. How to configure Kafka?
    Create the folder "kafka-logs": "${KAFKA_HOME}/kafka-logs"
    Create the log folder "logs": "${KAFKA_HOME}/logs"

    $ mkdir ${KAFKA_HOME}/kafka-logs
    $ mkdir ${KAFKA_HOME}/logs

    Configure the kafka server properties file server.properties, by adjusting the following properties:
    $ vi ${KAFKA_HOME}/config/server.properties
    broker.id=0
    listeners=PLAINTEXT://localhost:9092
    log.dirs=/opt/kafka_2.13-3.4.0/kafka-logs
    zookeeper.connect=localhost:2181/kafka

    Notes:
    • Leave listener hostname empty to bind to default interface:
      listeners=PLAINTEXT://:9092

    • Specify listener hostname as 0.0.0.0 to bind to all interfaces:
      listeners=PLAINTEXT://0.0.0.0:9092

    • Specify multiple listeners:
      listeners=PLAINTEXT://myhost:9092,SSL://:9091,CLIENT://0.0.0.0:9092,REPLICATION://localhost:9093
  4. How to start/stop Kafka?
    • Start Kafka server:
      $ ${KAFKA_HOME}/bin/kafka-server-start.sh -daemon ${KAFKA_HOME}/config/server.properties

      • Validate that Kafka is up/running and listening on port "9092":
        $ netstat -lp | grep 9092
        tcp6       0      0 localhost:9092          [::]:*                  LISTEN      31402/java

      • Validate that Kafka was registered properly to ZooKeeeper:
        $ ${ZK_HOME}/bin/zkCli.sh -server localhost:2181 get /kafka/brokers/ids/0
        {
          "listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},
          "endpoints":["PLAINTEXT://localhost:9092"],
          "jmx_port":-1,
          "host":"localhost",
          "timestamp":"1567958157584",
          "port":9092,
          "version":4
        }

    • Stop Kafka server:
      $ ${KAFKA_HOME}/bin/kafka-server-stop.sh
  5. How to install and configure a Kafka cluster on the same host?
    The following examples show how to configure a cluster of Kafka instances on the same host.

    Extract the file "kafka_2.13-3.4.0.tgz" in the folders you want to install Kafka:
    $ mkdir /opt/kafka-i0
    $ mkdir /opt/kafka-i1
    $ mkdir /opt/kafka-i2

    $ tar -xf ~/Downloads/kafka_2.13-3.4.0.tgz -C /opt/kafka-i0
    $ tar -xf ~/Downloads/kafka_2.13-3.4.0.tgz -C /opt/kafka-i1
    $ tar -xf ~/Downloads/kafka_2.13-3.4.0.tgz -C /opt/kafka-i2

    Create the folders "kafka-logs" and "logs" for each instance:
    $ mkdir /opt/kafka-i0/kafka_2.13-3.4.0/kafka-logs
    $ mkdir /opt/kafka-i0/kafka_2.13-3.4.0/logs

    $ mkdir /opt/kafka-i1/kafka_2.13-3.4.0/kafka-logs
    $ mkdir /opt/kafka-i1/kafka_2.13-3.4.0/logs

    $ mkdir /opt/kafka-i2/kafka_2.13-3.4.0/kafka-logs
    $ mkdir /opt/kafka-i2/kafka_2.13-3.4.0/logs

    Configure the kafka server properties file server.properties, by adjusting the following properties for each instance (the broker.id, listeners, log.dirs are specific to each instance):
    $ vi /opt/kafka-i0/kafka_2.13-3.4.0/config/server.properties
    broker.id=0
    listeners=PLAINTEXT://localhost:9092
    log.dirs=/opt/kafka-i0/kafka_2.13-3.4.0/kafka-logs
    zookeeper.connect=localhost:2181/kafka

    $ vi /opt/kafka-i1/kafka_2.13-3.4.0/config/server.properties
    broker.id=1
    listeners=PLAINTEXT://localhost:9192
    log.dirs=/opt/kafka-i1/kafka_2.13-3.4.0/kafka-logs
    zookeeper.connect=localhost:2181/kafka

    $ vi /opt/kafka-i2/kafka_2.13-3.4.0/config/server.properties
    broker.id=2
    listeners=PLAINTEXT://localhost:9292
    log.dirs=/opt/kafka-i2/kafka_2.13-3.4.0/kafka-logs
    zookeeper.connect=localhost:2181/kafka

    Start Kafka servers:
    $ cd /opt/kafka-i0/kafka_2.13-3.4.0 && ./bin/kafka-server-start.sh -daemon ./config/server.properties
    $ cd /opt/kafka-i1/kafka_2.13-3.4.0 && ./bin/kafka-server-start.sh -daemon ./config/server.properties
    $ cd /opt/kafka-i2/kafka_2.13-3.4.0 && ./bin/kafka-server-start.sh -daemon ./config/server.properties

    Stop Kafka servers:
    $ cd /opt/kafka-i0/kafka_2.13-3.4.0 && ./bin/kafka-server-stop.sh
    $ cd /opt/kafka-i1/kafka_2.13-3.4.0 && ./bin/kafka-server-stop.sh
    $ cd /opt/kafka-i2/kafka_2.13-3.4.0 && ./bin/kafka-server-stop.sh
  6. Common Kafka startup errors
    The following error indicates that the configured broker.id is already in use, or Kafka server was shutdown and restarted but ZooKeeper didn't timeout yet so it appears that Kafka is re-registering.
    FATAL [Kafka Server 0], Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
    java.lang.RuntimeException: A broker is already registered on the path /brokers/ids/0.

    The following error indicates that a process is already running using the same port:
    FATAL [Kafka Server 0], Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
    kafka.common.KafkaException: Socket server failed to bind to localhost:9092: Address already in use.
  7. Cleanup Kafka
    $ ${KAFKA_HOME}/bin/kafka-server-stop.sh

    $ rm -rf ${KAFKA_HOME}/kafka-logs/*
    $ rm -rf ${KAFKA_HOME}/logs/*

    $ ${ZK_HOME}/bin/zkCli.sh -server localhost:2181 deleteall /kafka
  8. How to create topics?
    Create a topic with one partition:
    $ ${KAFKA_HOME}/bin/kafka-topics.sh --create \
    --topic topic1 --partitions 1 --replication-factor 1 \
    --bootstrap-server "localhost:9092,localhost:9192,localhost:9292"
    Created topic topic1.

    Describe topic1:
    $ ${KAFKA_HOME}/bin/kafka-topics.sh --describe \
    --topic topic1 \
    --bootstrap-server "localhost:9092,localhost:9192,localhost:9292"
    Topic: topic1 | TopicId: qYBDRmzmT8GYae3Kz41WQw | PartitionCount: 1 | ReplicationFactor: 1 | Configs:
        Topic: topic1 | Partition: 0 | Leader: 0 | Replicas: 0 | Isr: 0

    Create a topic with two partitions:
    $ ${KAFKA_HOME}/bin/kafka-topics.sh --create \
    --topic topic2 --partitions 2 --replication-factor 3 \
    --bootstrap-server "localhost:9092,localhost:9192,localhost:9292"
    Created topic topic2.

    Describe topic2:
    $ ${KAFKA_HOME}/bin/kafka-topics.sh --describe \
    --topic topic2 \
    --bootstrap-server "localhost:9092,localhost:9192,localhost:9292"
    Topic: topic2 | TopicId: yH9U2HJ1QrWNODyQTzZjCg | PartitionCount: 2 | ReplicationFactor: 3 | Configs:
        Topic: topic2 | Partition: 0 | Leader: 1 | Replicas: 1,0,2 | Isr: 1,0,2
        Topic: topic2 | Partition: 1 | Leader: 0 | Replicas: 0,2,1 | Isr: 0,2,1

    Add new partitions to a topic (if partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected):
    $ ${KAFKA_HOME}/bin/kafka-topics.sh --alter \
    --topic topic2 --partitions 3 \
    --bootstrap-server "localhost:9092,localhost:9192,localhost:9292"

    Describe topic2:
    $ ${KAFKA_HOME}/bin/kafka-topics.sh --describe \
    --topic topic2 \
    --bootstrap-server "localhost:9092,localhost:9192,localhost:9292"
    Topic: topic2 | TopicId: yH9U2HJ1QrWNODyQTzZjCg | PartitionCount: 3 | ReplicationFactor: 3 | Configs:
        Topic: topic2 | Partition: 0 | Leader: 1 | Replicas: 1,0,2 | Isr: 1,0,2
        Topic: topic2 | Partition: 1 | Leader: 0 | Replicas: 0,2,1 | Isr: 0,2,1
        Topic: topic2 | Partition: 2 | Leader: 0 | Replicas: 0,2,1 | Isr: 0,2,1
  9. How to monitor topics?
    List topics:
    $ ${KAFKA_HOME}/bin/kafka-topics.sh --list \
    --bootstrap-server "localhost:9092,localhost:9192,localhost:9292"
    __consumer_offsets
    topic1
    topic2

    Describe a topic (name, partitions count, replication factor, partitions list/configurations):
    $ ${KAFKA_HOME}/bin/kafka-topics.sh --describe \
    --topic topic2 \
    --bootstrap-server "localhost:9092,localhost:9192,localhost:9292"
  10. How to delete topics?
    $ ${KAFKA_HOME}/bin/kafka-topics.sh --delete \
    --topic topic1 \
    --bootstrap-server "localhost:9092,localhost:9192,localhost:9292"
  11. How to send (produce) a message to Kafka?
    Using the console to produce a message to Kafka:
    $ ${KAFKA_HOME}/bin/kafka-console-producer.sh \
    --topic topic2 \
    --bootstrap-server "localhost:9092,localhost:9192,localhost:9292"
    >Hello Kafka World...
    >Hello Again...
    >^C

    The output when your read the messages with a consumer should looks as following:
    Hello Kafka World...
    Hello Again...

    Using a file to produce a message to Kafka (each line in the file is sent as a seprate message):
    $ echo "Message From A File" > /tmp/kafka-messages.txt
    $ echo "Another message From The Same File" >> /tmp/kafka-messages.txt
    $ ${KAFKA_HOME}/bin/kafka-console-producer.sh \
    --topic topic2 \
    --bootstrap-server "localhost:9092,localhost:9192,localhost:9292" \
    < /tmp/kafka-messages.txt

    The output when your read the messages with a consumer should looks as following (the order of messages might be different):
    Message From A File
    Another message From The Same File

    Using a file with "key:value" lines (one entry per line) to produce a message to Kafka (each line in the file is sent as a separate message):
    $ echo "msgkey:Key-Value Message From A File" > /tmp/kafka-messages-key-value.txt
    $ echo "anothermsgkey:Another Key-Value Message From A File" >> /tmp/kafka-messages-key-value.txt
    $ ${KAFKA_HOME}/bin/kafka-console-producer.sh \
    --topic topic2 --property "parse.key=true" --property "key.separator=:" \
    --bootstrap-server "localhost:9092,localhost:9192,localhost:9292" \
     < /tmp/kafka-messages-key-value.txt

    The output when your read the messages with a consumer should looks as following (the order of messages might be different):
    Key-Value Message From A File
    Another Key-Value Message From A File

    Using "key:value" command line to produce a message to Kafka:
    $ for (( i=0; i<5; i++ )); do echo "key$i:value$i" | \
    ${KAFKA_HOME}/bin/kafka-console-producer.sh \
    --topic topic2 --property "parse.key=true" --property "key.separator=:" \
    --bootstrap-server "localhost:9092,localhost:9192,localhost:9292" \
    ; done;

    The output when your read the messages with a consumer should looks as following (the order of messages might be different):
    value0
    value1
    value2
    value3
    value4
  12. How to send a very large file to Kafka?
    You need to configure the property max.request.size of the file ${KAFKA_HOME}/config/producer.properties
    # the maximum size of a request in bytes
    max.request.size=987654321

    You need to configure the property message.max.bytes of the file ${KAFKA_HOME}/config/zookeeper.properties
    message.max.bytes=987654321
  13. How to receive (consume) a message from kafka?
    Consume new messages from a specific topic using a custom "consumer.properties" file:
    $ echo "group.id=group-id-1" > /tmp/consumer-group-id-1.properties
    $ ${KAFKA_HOME}/bin/kafka-console-consumer.sh \
    --topic topic2 \
    --consumer.config "/tmp/consumer-group-id-1.properties" \
    --bootstrap-server "localhost:9092,localhost:9192,localhost:9292"

    Consume all messages from a specific topic using the option "from-beginning":
    $ ${KAFKA_HOME}/bin/kafka-console-consumer.sh \
    --topic topic2 --from-beginning \
    --bootstrap-server "localhost:9092,localhost:9192,localhost:9292"

    Consume messages from a specific topic using custom properties:
    $ ${KAFKA_HOME}/bin/kafka-console-consumer.sh \
    --topic topic2 \
    --from-beginning \
    --formatter kafka.tools.DefaultMessageFormatter \
    --property group.id=group-id-1 \
    --property print.key=true \
    --property print.value=true \
    --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
    --property value.deserializer=org.apache.kafka.common.serialization.StringDeserializer \
    --bootstrap-server "localhost:9092,localhost:9192,localhost:9292"

    A list of various command lines to consume messages from a specific topic using partitions properties:
    $ ${KAFKA_HOME}/bin/kafka-console-consumer.sh \
    --topic topic2 --offset latest --partition 0 \
    --bootstrap-server "localhost:9092,localhost:9192,localhost:9292"
    $ ${KAFKA_HOME}/bin/kafka-console-consumer.sh \
    --topic topic2 --offset 1 --partition 0 \
    --bootstrap-server "localhost:9092,localhost:9192,localhost:9292"
    $ ${KAFKA_HOME}/bin/kafka-console-consumer.sh \
    --topic topic2 --consumer-property "num.partitions"=0 \
    --bootstrap-server "localhost:9092,localhost:9192,localhost:9292"
  14. How to monitor consumer groups?
    List all consumers groups (this will only show information about consumers that use ZooKeeper, not those using the Java Consumer API):
    $ ${KAFKA_HOME}/bin/kafka-consumer-groups.sh --list \
    --bootstrap-server "localhost:9092,localhost:9192,localhost:9292"
    console-consumer-82209
    group-id-1

    Describe a consumer group (this will only show information about consumers that use ZooKeeper, not those using the Java Consumer API):
    $ ${KAFKA_HOME}/bin/kafka-consumer-groups.sh --describe \
    --group group-id-1 \
    --bootstrap-server "localhost:9092,localhost:9192,localhost:9292"

    Consumer group 'group-id-1' has no active members.
    
    GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
    group-id-1      topic2          0          9               10              1               -               -               -
    group-id-1      topic2          1          8               10              2               -               -               -
    group-id-1      topic2          2          4               6               2               -               -               -
  15. How to get an entity configuration?
    Get a topic configuration:
    $ ${KAFKA_HOME}/bin/kafka-configs.sh --describe \
    --entity-name topic2 --entity-type topics \
    --bootstrap-server "localhost:9092,localhost:9192,localhost:9292"

    Get a broker configuration:
    $ ${KAFKA_HOME}/bin/kafka-configs.sh --describe \
    --entity-name 0 --entity-type brokers \
    --bootstrap-server "localhost:9092,localhost:9192,localhost:9292"
  16. How to add/update/delete an entity configuration?
    Add/update new/existing configuration of a specific topic:
    $ ${KAFKA_HOME}/bin/kafka-configs.sh --alter \
    --add-config 'retention.ms=1000' --entity-name topic2 --entity-type topics \
    --bootstrap-server "localhost:9092,localhost:9192,localhost:9292"

    Get the topic configuration:
    $ ${KAFKA_HOME}/bin/kafka-configs.sh --describe \
    --entity-name topic2 --entity-type topics \
    --bootstrap-server "localhost:9092,localhost:9192,localhost:9292"
    Dynamic configs for topic topic2 are:
      retention.ms=1000 sensitive=false synonyms={DYNAMIC_TOPIC_CONFIG:retention.ms=1000}

    Delete a configuration from a specific topic:
    $ ${KAFKA_HOME}/bin/kafka-configs.sh --alter \
    --delete-config retention.ms --entity-name topic2 --entity-type topics \
    --bootstrap-server "localhost:9092,localhost:9192,localhost:9292"
  17. Kafka Tool
    Download Kafka Tool: http://www.kafkatool.com/download.html

    Copy Kafka Tool in the folder you want to install it: /opt/kafkatool

    Run Kafka Tool:
    $ /opt/kafkatool/kafkatool

    Broker (Properties):
    - id (0)
    - host (localhost)
    - Port (9092)

    Topic (Properties):
    - Name (topic2)
    - Key Content Type (No Key, String, Byte Array)
    - Message Content Type (String, Byte Array)
    - Total number of messages

    Topic (Data):
    - Partition (0)
    - Offset (0)
    - Key
    - Message

    Topic (Partitions):
    - id (0)
    - Leader (localhost:9092)

    Consumers (Partitions):
    - Id (group-id-1)
    - Consumer Type (kafka)
    - Offsets stored in (kafka)

    Consumers (Offsets):
    - Topic (topic2)
    - Partition (0)
    - Start (offset)
    - End (offset)
    - Offset
    - Lag
© 2025  mtitek