Kafka实践

前言

在上一篇文章,我们介绍的Kafka的基本概念和原理。我们尝试搭建一个Kafka的集群,并尝试生产和消费消息。在消费消息时,需要注意PHP的扩展rdkafka的消费接口不同以及消息ACK的机制。在Kafka集群中,有一个很重要的组件是zookeeper,下面我们会了解下什么是Zookeeper。

Zookeeper

Zookeeper是Apache的顶级项目,作为一个中心服务,用于维护命名和配置数据,并在分布式系统中提供灵活和健壮的数据同步。Zookeeper跟踪Kafka集群节点的状态、主题、分区等信息。Zookeeper允许多个客户端同时进行读写操作,并充当系统内的共享配置服务。

那Zeekeeper是如何工作的呢?Zookeeper将数据划分到多个节点集合中,这就是它实现高可用性和一致性的方法。如果有一个节点失败,Zookeeper可以执行即时故障转移,例如,如果主节点出现失败,则通过集合内的轮询实时选择一个新节点。如果第一个节点无法响应,连接服务器的客户端可以查询另外一个节点。

为什么Zookeeper对Apache Kafka来说是必须的?

在Kafka里,Zookeeper扮演着很重要的角色。有如下几个功能:

  1. 控制选举。控制者是Kafka生态中最重要的代理实体之一,负责维护所有分区之间的领导者与跟随者的关系。如果领导者节点挂掉,控制者负责在所有分区选举出新的领导者。
  2. 主题配置。保存着所有的主题配置,列表,分区数量,所有的副本位置,所有主题的配置,节点的首选领导等。
  3. 访问控制列表。Zookeeper维护着所有主题的访问控制列表。
  4. 集群的成员。Zeekeeper还维护着实时正在运行的所有代理列表。

消息消费

在PHP的扩展中,rdkafka实现了两种消费接口,High-level Consumer和Low-level Consumer。

接口 实现 备注
High-level Consumer 自动分区分配和撤销,自动管理offset
Low-level Consumer 手动维护分区,offset

消息ACK

Kafka消息可能会丢失的情况,可能会出现在消息发送和消息消费。Kafka通过配置request.required.acks属性是否进行消息确认。

配置 性能 内部实现 备注
0 不进行消息接受是否成功的确认
1 当Leader接受成功才确认
-1 当Leader和Follower都接受成功才确认

Kafka通过配置producer.type属性来决定消息的同步或异步发送。

综上,可能会有两种场景会丢失消息:

  1. acks = 0,不进行消息接受确认,网络异常、磁盘异常等会导致消息丢失。
  2. acks = 1,同步模式下,只有Leader确认接受成功但Leader由于异常挂掉了,副本同步失败,可能会造成消息丢失。

如何搭建Kafka服务

从Github上下载Kafka的Dockerfile

cd dev
git clone git@github.com:wurstmeister/kafka-docker.git

获取当前Ubuntu服务器的ip地址

ifconfig | grep -Eo 'inet (addr:)?([0-9]*\.){3}[0-9]*' | grep -Eo '([0-9]*\.){3}[0-9]*' | grep -v '127.0.0.1'|grep -v '172*'| tail -1
192.168.1.3

替换版本库中的KAFKA_ADVERTISED_HOST_NAME

cd kafka-docker
sed -i 's/192.168.99.100/192.168.1.3/g' docker-compose.yml

启动整个集群

docker-compose up -d
Creating kafkadocker_kafka_1 ... 
Creating kafkadocker_zookeeper_1 ... 
Creating kafkadocker_kafka_1
Creating kafkadocker_kafka_1 ... done

经过漫长的等待,执行完成,可以看到启动了2个docker服务,分别以kafkadocker_kafka和wurstmeister/zookeeper为镜像。

docker ps

kafkadocker_kafka        "start-kafka.sh"       0.0.0.0:32768->9092/tcp kafkadocker_kafka_1
wurstmeister/zookeeper   "/bin/sh -c '/usr/sb…"   22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp   kafkadocker_zookeeper_1

查看kafka的信息

./start-kafka-shell.sh 192.168.1.3  192.168.1.3:2181
# 进入docker环境后有4个脚本
bash-4.4# ls /usr/bin/ | grep '\.sh'
broker-list.sh 查看代理列表
create-topics.sh 创建主题
start-kafka.sh 启动kafka
versions.sh 显示版本

创建topics test

# 查看topics
kafka-topics.sh --list --zookeeper $ZK
# 创建一个topics test
kafka-topics.sh --create --topic test \
--partitions 4 --zookeeper $ZK --replication-factor 1
bash-4.4# kafka-topics.sh --list --zookeeper $ZK
test
# 显示topic test的相关信息
kafka-topics.sh --describe --topic test --zookeeper $ZK
Topic:test  PartitionCount:4    ReplicationFactor:1 Configs:
    Topic: test Partition: 0    Leader: 1001    Replicas: 1001  Isr: 1001
    Topic: test Partition: 1    Leader: 1001    Replicas: 1001  Isr: 1001
    Topic: test Partition: 2    Leader: 1001    Replicas: 1001  Isr: 1001
    Topic: test Partition: 3    Leader: 1001    Replicas: 1001  Isr: 1001

生产消息

kafka-console-producer.sh --topic=test --broker-list=`broker-list.sh`
# 输入msg后回车即可

消费消息

# 启动另外一个终端进行消费
./start-kafka-shell.sh 192.168.1.3  192.168.1.3:2181
# 进入docker 后在命令行输入
kafka-console-consumer.sh --topic=test --bootstrap-server `broker-list.sh`
# 收到消息后在直接显示出来

查看zookeeper信息

# 进入zookeeper shell
zookeeper-shell.sh $ZK
# 可查看相关帮助文档
help 
# 查看topics
ls /brokers/topics

遇到的问题

创建topics失败

kafka-topics.sh --create --topic test --partitions 4 --zookeeper $ZK --replication-factor 2
Error while executing topic command : Replication factor: 2 larger than available brokers: 1.
[2019-02-23 12:45:42,619] ERROR org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 2 larger than available brokers: 1.
 (kafka.admin.TopicCommand$)

出现这个错误,提示brokers的数量较少,replication-factor 复制因子过大,如果只有一个broker,改为1即可。

消费失败

kafka-console-consumer.sh --topic=test --zookeeper=$ZK
# 出现报错
zookeeper is not a recognized option

最新版本的consumer去掉了这个选项,添加额外的选项即可。

kafka-console-consumer.sh --topic=test --bootstrap-server `broker-list.sh`

参考链接

  1. Dockerfile for Apache Kafka
  2. Run multiple Kafka brokers in Docker
  3. Rdkafka doc
  4. What is Zookeeper and why is it needed for Apache Kafka?
  5. kafka 数据可靠性深度解读
  6. Kafka High Availability

发表评论

电子邮件地址不会被公开。 必填项已用*标注