下载 Kafka

下载 最新的版本

Terminal window
wget https://downloads.apache.org/kafka/3.8.0/kafka_2.12-3.8.0.tgz
tar -xzf kafka_2.12-3.8.0.tgz
cd kafka_2.12-3.8.0

启动 Kafka

Apache Kafka 需要 Java 8+,可以使用 KRaft 或 ZooKeeper 启动。

Kafka 与 KRaft

生成集群 UUID:

Terminal window
KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"

格式化日志目录:

Terminal window
bin/kafka-storage.sh format -t $KAFKA_CLUSTER_ID -c config/kraft/server.properties

启动Kafka服务器:

Terminal window
bin/kafka-server-start.sh config/kraft/server.properties

Kafka 与 ZooKeeper

依次启动 ZooKeeper、Kafka:

Terminal window
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties

使用基于 JVM 的 Docker 镜像

获取 Docker 映像,启动 Kafka Docker 容器:

Terminal window
docker pull apache/kafka:3.8.0
docker run -p 9092:9092 apache/kafka:3.8.0

使用基于 GraalVM 的原生 Docker 镜像

获取 Docker 映像,启动 Kafka Docker 容器:

Terminal window
docker pull apache/kafka-native:3.8.0
docker run -p 9092:9092 apache/kafka-native:3.8.0

使用 Docker Compose

services:
kafka:
image: apache/kafka:3.8.0
ports:
- "9092:9092"

下面文件内容来自:Apache Kafka 官方示例

services:
broker:
image: apache/kafka:3.8.0
hostname: broker
container_name: broker
ports:
- "9092:9092"
environment:
KAFKA_NODE_ID: 1
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: "CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT"
KAFKA_ADVERTISED_LISTENERS: "PLAINTEXT_HOST://localhost:9092,PLAINTEXT://broker:19092"
KAFKA_PROCESS_ROLES: "broker,controller"
KAFKA_CONTROLLER_QUORUM_VOTERS: "1@broker:29093"
KAFKA_LISTENERS: "CONTROLLER://:29093,PLAINTEXT_HOST://:9092,PLAINTEXT://:19092"
KAFKA_INTER_BROKER_LISTENER_NAME: "PLAINTEXT"
KAFKA_CONTROLLER_LISTENER_NAMES: "CONTROLLER"
CLUSTER_ID: "4L6g3nShT-eMCtK--X86sw"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_LOG_DIRS: "/tmp/kraft-combined-logs"

创建主题

创建一个主题:

Terminal window
bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092

这时候查看 Kafka 的日志,可以看到:

[2024-10-17 06:55:48,671] INFO Created log for partition quickstart-events-0 in /tmp/kraft-combined-logs/quickstart-events-0 with properties {} (kafka.log.LogManager)
[2024-10-17 06:55:48,673] INFO [Partition quickstart-events-0 broker=1] No checkpointed highwatermark is found for partition quickstart-events-0 (kafka.cluster.Partition)
[2024-10-17 06:55:48,673] INFO [Partition quickstart-events-0 broker=1] Log loaded for partition quickstart-events-0 with initial high watermark 0 (kafka.cluster.Partition)

查看主题描述:

Terminal window
$ bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092
Topic: quickstart-events TopicId: ucW__bDbT6OAlVfP1xLoJw PartitionCount: 1 ReplicationFactor: 1 Configs: segment.bytes=1073741824
Topic: quickstart-events Partition: 0 Leader: 1 Replicas: 1 Isr: 1 Elr: LastKnownElr:

写入事件

Kafka 客户端通过网络与 Kafka 代理进行通信,以写入(或读取)事件。一旦收到事件,代理将以持久且容错的方式存储事件,存储时间长短取决于您的需要,甚至是永久存储。

运行控制台生产者客户端,将一些事件写入主题。默认情况下,您输入的每一行都会导致将一个单独的事件写入主题。

Terminal window
$ bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
>This is my first event
>This is my second event

Ctrl-C您可以随时停止生产者客户端。

读取事件

打开另一个终端会话并运行控制台消费者客户端来读取刚刚创建的事件:

Terminal window
$ bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
This is my first event
This is my second event

Ctrl-C您可以随时停止消费者客户端。

由于事件持久存储在 Kafka 中,因此您可以根据需要多次读取它们,并让任意数量的消费者读取它们。您可以通过打开另一个终端会话并再次运行上一个命令来轻松验证这一点。

相关 IDEA 插件推荐

  • Zoolytic-Zookeeper tool
  • Kafkalytic