(3.4.0)
kafka_2.13-3.4.0.tgz
" in the folder you want to install Kafka: /opt/kafka_2.13-3.4.0
$ tar -xf ~/Downloads/kafka_2.13-3.4.0.tgz -C /opt/
${KAFKA_HOME}
will refer to this location "/opt/kafka_2.13-3.4.0
"$ export KAFKA_HOME=/opt/kafka_2.13-3.4.0
kafka-logs
": "${KAFKA_HOME}/kafka-logs
"logs
": "${KAFKA_HOME}/logs
"$ mkdir ${KAFKA_HOME}/kafka-logs $ mkdir ${KAFKA_HOME}/logs
server.properties
, by adjusting the following properties:$ vi ${KAFKA_HOME}/config/server.properties
broker.id=0 listeners=PLAINTEXT://localhost:9092 log.dirs=/opt/kafka_2.13-3.4.0/kafka-logs zookeeper.connect=localhost:2181/kafka
listeners=PLAINTEXT://:9092
0.0.0.0
to bind to all interfaces:
listeners=PLAINTEXT://0.0.0.0:9092
listeners=PLAINTEXT://myhost:9092,SSL://:9091,CLIENT://0.0.0.0:9092,REPLICATION://localhost:9093
$ ${KAFKA_HOME}/bin/kafka-server-start.sh -daemon ${KAFKA_HOME}/config/server.properties
9092
":$ netstat -lp | grep 9092
tcp6 0 0 localhost:9092 [::]:* LISTEN 31402/java
$ ${ZK_HOME}/bin/zkCli.sh -server localhost:2181 get /kafka/brokers/ids/0
{ "listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"}, "endpoints":["PLAINTEXT://localhost:9092"], "jmx_port":-1, "host":"localhost", "timestamp":"1567958157584", "port":9092, "version":4 }
$ ${KAFKA_HOME}/bin/kafka-server-stop.sh
kafka_2.13-3.4.0.tgz
" in the folders you want to install Kafka:$ mkdir /opt/kafka-i0 $ mkdir /opt/kafka-i1 $ mkdir /opt/kafka-i2
$ tar -xf ~/Downloads/kafka_2.13-3.4.0.tgz -C /opt/kafka-i0 $ tar -xf ~/Downloads/kafka_2.13-3.4.0.tgz -C /opt/kafka-i1 $ tar -xf ~/Downloads/kafka_2.13-3.4.0.tgz -C /opt/kafka-i2
kafka-logs
" and "logs
" for each instance:$ mkdir /opt/kafka-i0/kafka_2.13-3.4.0/kafka-logs $ mkdir /opt/kafka-i0/kafka_2.13-3.4.0/logs
$ mkdir /opt/kafka-i1/kafka_2.13-3.4.0/kafka-logs $ mkdir /opt/kafka-i1/kafka_2.13-3.4.0/logs
$ mkdir /opt/kafka-i2/kafka_2.13-3.4.0/kafka-logs $ mkdir /opt/kafka-i2/kafka_2.13-3.4.0/logs
server.properties
,
by adjusting the following properties for each instance
(the broker.id, listeners, log.dirs are specific to each instance):$ vi /opt/kafka-i0/kafka_2.13-3.4.0/config/server.properties
broker.id=0 listeners=PLAINTEXT://localhost:9092 log.dirs=/opt/kafka-i0/kafka_2.13-3.4.0/kafka-logs zookeeper.connect=localhost:2181/kafka
$ vi /opt/kafka-i1/kafka_2.13-3.4.0/config/server.properties
broker.id=1 listeners=PLAINTEXT://localhost:9192 log.dirs=/opt/kafka-i1/kafka_2.13-3.4.0/kafka-logs zookeeper.connect=localhost:2181/kafka
$ vi /opt/kafka-i2/kafka_2.13-3.4.0/config/server.properties
broker.id=2 listeners=PLAINTEXT://localhost:9292 log.dirs=/opt/kafka-i2/kafka_2.13-3.4.0/kafka-logs zookeeper.connect=localhost:2181/kafka
$ cd /opt/kafka-i0/kafka_2.13-3.4.0 && ./bin/kafka-server-start.sh -daemon ./config/server.properties
$ cd /opt/kafka-i1/kafka_2.13-3.4.0 && ./bin/kafka-server-start.sh -daemon ./config/server.properties
$ cd /opt/kafka-i2/kafka_2.13-3.4.0 && ./bin/kafka-server-start.sh -daemon ./config/server.properties
$ cd /opt/kafka-i0/kafka_2.13-3.4.0 && ./bin/kafka-server-stop.sh
$ cd /opt/kafka-i1/kafka_2.13-3.4.0 && ./bin/kafka-server-stop.sh
$ cd /opt/kafka-i2/kafka_2.13-3.4.0 && ./bin/kafka-server-stop.sh
broker.id
is already in use,
or Kafka server was shutdown and restarted but ZooKeeper didn't timeout yet so it appears that Kafka is re-registering.
FATAL [Kafka Server 0], Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer) java.lang.RuntimeException: A broker is already registered on the path /brokers/ids/0.
FATAL [Kafka Server 0], Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer) kafka.common.KafkaException: Socket server failed to bind to localhost:9092: Address already in use.
$ ${KAFKA_HOME}/bin/kafka-server-stop.sh
$ rm -rf ${KAFKA_HOME}/kafka-logs/* $ rm -rf ${KAFKA_HOME}/logs/*
$ ${ZK_HOME}/bin/zkCli.sh -server localhost:2181 deleteall /kafka
$ ${KAFKA_HOME}/bin/kafka-topics.sh --create \ --topic topic1 --partitions 1 --replication-factor 1 \ --bootstrap-server "localhost:9092,localhost:9192,localhost:9292"
Created topic topic1.
$ ${KAFKA_HOME}/bin/kafka-topics.sh --describe \ --topic topic1 \ --bootstrap-server "localhost:9092,localhost:9192,localhost:9292"
Topic: topic1 | TopicId: qYBDRmzmT8GYae3Kz41WQw | PartitionCount: 1 | ReplicationFactor: 1 | Configs: Topic: topic1 | Partition: 0 | Leader: 0 | Replicas: 0 | Isr: 0
$ ${KAFKA_HOME}/bin/kafka-topics.sh --create \ --topic topic2 --partitions 2 --replication-factor 3 \ --bootstrap-server "localhost:9092,localhost:9192,localhost:9292"
Created topic topic2.
$ ${KAFKA_HOME}/bin/kafka-topics.sh --describe \ --topic topic2 \ --bootstrap-server "localhost:9092,localhost:9192,localhost:9292"
Topic: topic2 | TopicId: yH9U2HJ1QrWNODyQTzZjCg | PartitionCount: 2 | ReplicationFactor: 3 | Configs: Topic: topic2 | Partition: 0 | Leader: 1 | Replicas: 1,0,2 | Isr: 1,0,2 Topic: topic2 | Partition: 1 | Leader: 0 | Replicas: 0,2,1 | Isr: 0,2,1
$ ${KAFKA_HOME}/bin/kafka-topics.sh --alter \ --topic topic2 --partitions 3 \ --bootstrap-server "localhost:9092,localhost:9192,localhost:9292"
$ ${KAFKA_HOME}/bin/kafka-topics.sh --describe \ --topic topic2 \ --bootstrap-server "localhost:9092,localhost:9192,localhost:9292"
Topic: topic2 | TopicId: yH9U2HJ1QrWNODyQTzZjCg | PartitionCount: 3 | ReplicationFactor: 3 | Configs: Topic: topic2 | Partition: 0 | Leader: 1 | Replicas: 1,0,2 | Isr: 1,0,2 Topic: topic2 | Partition: 1 | Leader: 0 | Replicas: 0,2,1 | Isr: 0,2,1 Topic: topic2 | Partition: 2 | Leader: 0 | Replicas: 0,2,1 | Isr: 0,2,1
$ ${KAFKA_HOME}/bin/kafka-topics.sh --list \ --bootstrap-server "localhost:9092,localhost:9192,localhost:9292"
__consumer_offsets topic1 topic2
$ ${KAFKA_HOME}/bin/kafka-topics.sh --describe \ --topic topic2 \ --bootstrap-server "localhost:9092,localhost:9192,localhost:9292"
$ ${KAFKA_HOME}/bin/kafka-topics.sh --delete \ --topic topic1 \ --bootstrap-server "localhost:9092,localhost:9192,localhost:9292"
$ ${KAFKA_HOME}/bin/kafka-console-producer.sh \ --topic topic2 \ --bootstrap-server "localhost:9092,localhost:9192,localhost:9292"
>Hello Kafka World... >Hello Again... >^C
Hello Kafka World... Hello Again...
$ echo "Message From A File" > /tmp/kafka-messages.txt $ echo "Another message From The Same File" >> /tmp/kafka-messages.txt
$ ${KAFKA_HOME}/bin/kafka-console-producer.sh \ --topic topic2 \ --bootstrap-server "localhost:9092,localhost:9192,localhost:9292" \ < /tmp/kafka-messages.txt
Message From A File Another message From The Same File
$ echo "msgkey:Key-Value Message From A File" > /tmp/kafka-messages-key-value.txt $ echo "anothermsgkey:Another Key-Value Message From A File" >> /tmp/kafka-messages-key-value.txt
$ ${KAFKA_HOME}/bin/kafka-console-producer.sh \ --topic topic2 --property "parse.key=true" --property "key.separator=:" \ --bootstrap-server "localhost:9092,localhost:9192,localhost:9292" \ < /tmp/kafka-messages-key-value.txt
Key-Value Message From A File Another Key-Value Message From A File
$ for (( i=0; i<5; i++ )); do echo "key$i:value$i" | \ ${KAFKA_HOME}/bin/kafka-console-producer.sh \ --topic topic2 --property "parse.key=true" --property "key.separator=:" \ --bootstrap-server "localhost:9092,localhost:9192,localhost:9292" \ ; done;
value0 value1 value2 value3 value4
max.request.size
of the file ${KAFKA_HOME}/config/producer.properties
# the maximum size of a request in bytes max.request.size=987654321
message.max.bytes
of the file ${KAFKA_HOME}/config/zookeeper.properties
message.max.bytes=987654321
consumer.properties
" file:$ echo "group.id=group-id-1" > /tmp/consumer-group-id-1.properties
$ ${KAFKA_HOME}/bin/kafka-console-consumer.sh \ --topic topic2 \ --consumer.config "/tmp/consumer-group-id-1.properties" \ --bootstrap-server "localhost:9092,localhost:9192,localhost:9292"
from-beginning
":$ ${KAFKA_HOME}/bin/kafka-console-consumer.sh \ --topic topic2 --from-beginning \ --bootstrap-server "localhost:9092,localhost:9192,localhost:9292"
$ ${KAFKA_HOME}/bin/kafka-console-consumer.sh \ --topic topic2 \ --from-beginning \ --formatter kafka.tools.DefaultMessageFormatter \ --property group.id=group-id-1 \ --property print.key=true \ --property print.value=true \ --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer \ --property value.deserializer=org.apache.kafka.common.serialization.StringDeserializer \ --bootstrap-server "localhost:9092,localhost:9192,localhost:9292"
$ ${KAFKA_HOME}/bin/kafka-console-consumer.sh \ --topic topic2 --offset latest --partition 0 \ --bootstrap-server "localhost:9092,localhost:9192,localhost:9292"
$ ${KAFKA_HOME}/bin/kafka-console-consumer.sh \ --topic topic2 --offset 1 --partition 0 \ --bootstrap-server "localhost:9092,localhost:9192,localhost:9292"
$ ${KAFKA_HOME}/bin/kafka-console-consumer.sh \ --topic topic2 --consumer-property "num.partitions"=0 \ --bootstrap-server "localhost:9092,localhost:9192,localhost:9292"
$ ${KAFKA_HOME}/bin/kafka-consumer-groups.sh --list \ --bootstrap-server "localhost:9092,localhost:9192,localhost:9292"
console-consumer-82209 group-id-1
$ ${KAFKA_HOME}/bin/kafka-consumer-groups.sh --describe \ --group group-id-1 \ --bootstrap-server "localhost:9092,localhost:9192,localhost:9292"
Consumer group 'group-id-1' has no active members. GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID group-id-1 topic2 0 9 10 1 - - - group-id-1 topic2 1 8 10 2 - - - group-id-1 topic2 2 4 6 2 - - -
$ ${KAFKA_HOME}/bin/kafka-configs.sh --describe \ --entity-name topic2 --entity-type topics \ --bootstrap-server "localhost:9092,localhost:9192,localhost:9292"
$ ${KAFKA_HOME}/bin/kafka-configs.sh --describe \ --entity-name 0 --entity-type brokers \ --bootstrap-server "localhost:9092,localhost:9192,localhost:9292"
$ ${KAFKA_HOME}/bin/kafka-configs.sh --alter \ --add-config 'retention.ms=1000' --entity-name topic2 --entity-type topics \ --bootstrap-server "localhost:9092,localhost:9192,localhost:9292"
$ ${KAFKA_HOME}/bin/kafka-configs.sh --describe \ --entity-name topic2 --entity-type topics \ --bootstrap-server "localhost:9092,localhost:9192,localhost:9292"
Dynamic configs for topic topic2 are: retention.ms=1000 sensitive=false synonyms={DYNAMIC_TOPIC_CONFIG:retention.ms=1000}
$ ${KAFKA_HOME}/bin/kafka-configs.sh --alter \ --delete-config retention.ms --entity-name topic2 --entity-type topics \ --bootstrap-server "localhost:9092,localhost:9192,localhost:9292"
/opt/kafkatool
$ /opt/kafkatool/kafkatool
- id (0) - host (localhost) - Port (9092)
- Name (topic2) - Key Content Type (No Key, String, Byte Array) - Message Content Type (String, Byte Array) - Total number of messages
- Partition (0) - Offset (0) - Key - Message
- id (0) - Leader (localhost:9092)
- Id (group-id-1) - Consumer Type (kafka) - Offsets stored in (kafka)
- Topic (topic2) - Partition (0) - Start (offset) - End (offset) - Offset - Lag