Kafka实践

前言

在上一篇文章,我们介绍的Kafka的基本概念和原理。我们尝试搭建一个Kafka的集群,并尝试生产和消费消息。在消费消息时,需要注意PHP的扩展rdkafka的消费接口不同以及消息ACK的机制。在Kafka集群中,有一个很重要的组件是zookeeper,下面我们会了解下什么是Zookeeper。

Zookeeper

Zookeeper是Apache的顶级项目,作为一个中心服务,用于维护命名和配置数据,并在分布式系统中提供灵活和健壮的数据同步。Zookeeper跟踪Kafka集群节点的状态、主题、分区等信息。Zookeeper允许多个客户端同时进行读写操作,并充当系统内的共享配置服务。

那Zeekeeper是如何工作的呢?Zookeeper将数据划分到多个节点集合中,这就是它实现高可用性和一致性的方法。如果有一个节点失败,Zookeeper可以执行即时故障转移,例如,如果主节点出现失败,则通过集合内的轮询实时选择一个新节点。如果第一个节点无法响应,连接服务器的客户端可以查询另外一个节点。

为什么Zookeeper对Apache Kafka来说是必须的?

在Kafka里,Zookeeper扮演着很重要的角色。有如下几个功能:

  1. 控制选举。控制者是Kafka生态中最重要的代理实体之一,负责维护所有分区之间的领导者与跟随者的关系。如果领导者节点挂掉,控制者负责在所有分区选举出新的领导者。
  2. 主题配置。保存着所有的主题配置,列表,分区数量,所有的副本位置,所有主题的配置,节点的首选领导等。
  3. 访问控制列表。Zookeeper维护着所有主题的访问控制列表。
  4. 集群的成员。Zeekeeper还维护着实时正在运行的所有代理列表。

消息消费

在PHP的扩展中,rdkafka实现了两种消费接口,High-level Consumer和Low-level Consumer。

接口 实现 备注
High-level Consumer 自动分区分配和撤销,自动管理offset
Low-level Consumer 手动维护分区,offset

消息ACK

Kafka消息可能会丢失的情况,可能会出现在消息发送和消息消费。Kafka通过配置request.required.acks属性是否进行消息确认。

配置 性能 内部实现 备注
0 不进行消息接受是否成功的确认
1 当Leader接受成功才确认
-1 当Leader和Follower都接受成功才确认

Kafka通过配置producer.type属性来决定消息的同步或异步发送。

综上,可能会有两种场景会丢失消息:

  1. acks = 0,不进行消息接受确认,网络异常、磁盘异常等会导致消息丢失。
  2. acks = 1,同步模式下,只有Leader确认接受成功但Leader由于异常挂掉了,副本同步失败,可能会造成消息丢失。

如何搭建Kafka服务

从Github上下载Kafka的Dockerfile

cd dev
git clone git@github.com:wurstmeister/kafka-docker.git

获取当前Ubuntu服务器的ip地址

ifconfig | grep -Eo 'inet (addr:)?([0-9]*\.){3}[0-9]*' | grep -Eo '([0-9]*\.){3}[0-9]*' | grep -v '127.0.0.1'|grep -v '172*'| tail -1
192.168.1.3

替换版本库中的KAFKA_ADVERTISED_HOST_NAME

cd kafka-docker
sed -i 's/192.168.99.100/192.168.1.3/g' docker-compose.yml

启动整个集群

docker-compose up -d
Creating kafkadocker_kafka_1 ... 
Creating kafkadocker_zookeeper_1 ... 
Creating kafkadocker_kafka_1
Creating kafkadocker_kafka_1 ... done

经过漫长的等待,执行完成,可以看到启动了2个docker服务,分别以kafkadocker_kafka和wurstmeister/zookeeper为镜像。

docker ps

kafkadocker_kafka        "start-kafka.sh"       0.0.0.0:32768->9092/tcp kafkadocker_kafka_1
wurstmeister/zookeeper   "/bin/sh -c '/usr/sb…"   22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp   kafkadocker_zookeeper_1

查看kafka的信息

./start-kafka-shell.sh 192.168.1.3  192.168.1.3:2181
# 进入docker环境后有4个脚本
bash-4.4# ls /usr/bin/ | grep '\.sh'
broker-list.sh 查看代理列表
create-topics.sh 创建主题
start-kafka.sh 启动kafka
versions.sh 显示版本

创建topics test

# 查看topics
kafka-topics.sh --list --zookeeper $ZK
# 创建一个topics test
kafka-topics.sh --create --topic test \
--partitions 4 --zookeeper $ZK --replication-factor 1
bash-4.4# kafka-topics.sh --list --zookeeper $ZK
test
# 显示topic test的相关信息
kafka-topics.sh --describe --topic test --zookeeper $ZK
Topic:test  PartitionCount:4    ReplicationFactor:1 Configs:
    Topic: test Partition: 0    Leader: 1001    Replicas: 1001  Isr: 1001
    Topic: test Partition: 1    Leader: 1001    Replicas: 1001  Isr: 1001
    Topic: test Partition: 2    Leader: 1001    Replicas: 1001  Isr: 1001
    Topic: test Partition: 3    Leader: 1001    Replicas: 1001  Isr: 1001

生产消息

kafka-console-producer.sh --topic=test --broker-list=`broker-list.sh`
# 输入msg后回车即可

消费消息

# 启动另外一个终端进行消费
./start-kafka-shell.sh 192.168.1.3  192.168.1.3:2181
# 进入docker 后在命令行输入
kafka-console-consumer.sh --topic=test --bootstrap-server `broker-list.sh`
# 收到消息后在直接显示出来

查看zookeeper信息

# 进入zookeeper shell
zookeeper-shell.sh $ZK
# 可查看相关帮助文档
help 
# 查看topics
ls /brokers/topics

遇到的问题

创建topics失败

kafka-topics.sh --create --topic test --partitions 4 --zookeeper $ZK --replication-factor 2
Error while executing topic command : Replication factor: 2 larger than available brokers: 1.
[2019-02-23 12:45:42,619] ERROR org.apache.kafka.common.errors.InvalidReplicationFactorException: Replication factor: 2 larger than available brokers: 1.
 (kafka.admin.TopicCommand$)

出现这个错误,提示brokers的数量较少,replication-factor 复制因子过大,如果只有一个broker,改为1即可。

消费失败

kafka-console-consumer.sh --topic=test --zookeeper=$ZK
# 出现报错
zookeeper is not a recognized option

最新版本的consumer去掉了这个选项,添加额外的选项即可。

kafka-console-consumer.sh --topic=test --bootstrap-server `broker-list.sh`

参考链接

  1. Dockerfile for Apache Kafka
  2. Run multiple Kafka brokers in Docker
  3. Rdkafka doc
  4. What is Zookeeper and why is it needed for Apache Kafka?
  5. kafka 数据可靠性深度解读
  6. Kafka High Availability

MySQL实践

前言

在上一篇文章,我们整理总结MySQL的常用知识点和MySQL的复制相关概念,在这篇文章,我们尝试搭建一个MySQL的主从服务。

主从架构

我们从github上拉取MySQL主从复制的版本库,直接尝试进行构建。该版本库构建了主从服务,定义了对数据库mydb的基于行的复制。

cd dev
git clone git@github.com:vbabak/docker-mysql-master-slave.git
./build.sh

# 执行成功后会显示
               Slave_IO_State: Waiting for master to send event
              Master_Log_File: mysql-bin.000003
          Read_Master_Log_Pos: 600
               Relay_Log_File: mysql-relay-bin.000002
                Relay_Log_Pos: 320
        Relay_Master_Log_File: mysql-bin.000003
             Slave_IO_Running: Yes
            Slave_SQL_Running: Yes
      Slave_SQL_Running_State: Slave has read all relay log; waiting for more updates

查看构建日志

docker-compose logs

查看当前进程

docker-compose ps

    Name                 Command             State                      Ports                   
------------------------------------------------------------------------------------------------
mysql_master   docker-entrypoint.sh mysqld   Up      3306/tcp, 33060/tcp, 0.0.0.0:4406->4406/tcp
mysql_slave    docker-entrypoint.sh mysqld   Up      3306/tcp, 33060/tcp, 0.0.0.0:5506->5506/tcp

查看master的状态

docker exec mysql_master sh -c 'mysql -u root -p111 -e "SHOW MASTER STATUS \G"'
mysql: [Warning] Using a password on the command line interface can be insecure.
*************************** 1. row ***************************
             File: mysql-bin.000003
         Position: 600
     Binlog_Do_DB: mydb
 Binlog_Ignore_DB: 
Executed_Gtid_Set:

查看slave的状态

docker exec mysql_slave sh -c 'mysql -u root -p111 -e "SHOW SLAVE STATUS \G"'

Slave_IO_State: Waiting for master to send event
                  Master_Host: 172.19.0.2
                  Master_User: mydb_slave_user
                  Master_Port: 3306
                Connect_Retry: 60
              Master_Log_File: mysql-bin.000003
          Read_Master_Log_Pos: 600
               Relay_Log_File: mysql-relay-bin.000002
                Relay_Log_Pos: 320
        Relay_Master_Log_File: mysql-bin.000003
             Slave_IO_Running: Yes
            Slave_SQL_Running: Yes

在master上创建一个dev数据库,在slave上查看是否同步成功。

# 查看数据库
docker exec mysql_slave sh -c 'mysql -u root -p111 -e "SHOW DATABASES \G"'
docker exec mysql_master sh -c 'mysql -u root -p111 -e "SHOW DATABASES \G"'

# 查看记录
docker exec mysql_master sh -c "export MYSQL_PWD=111; mysql -u root mydb -e 'create table code(code int); insert into code values (100), (200)'"

# 查看master的状态
docker exec mysql_master sh -c "export MYSQL_PWD=111; mysql -u root mydb -e 'select * from code \G'"
docker exec mysql_master sh -c 'mysql -u root -p111 -e "SHOW DATABASES \G"'
docker exec mysql_master sh -c 'mysql -u root -p111 -e "SHOW MASTER STATUS \G"'

# 查看同步结果 slave状态
docker exec mysql_slave sh -c "export MYSQL_PWD=111; mysql -u root mydb -e 'select * from code \G'"
docker exec mysql_slave sh -c 'mysql -u root -p111 -e "SHOW DATABASES \G"'
docker exec mysql_slave sh -c 'mysql -u root -p111 -e "SHOW SLAVE STATUS \G"'

在这里,我们可以看到两条数据已经完整的同步过来。

*************************** 1. row ***************************
code: 100
*************************** 2. row ***************************
code: 200

参考链接

  1. Docker MySQL master-slave replication
  2. Mysql Master/Slave Replication With Docker

MySQL复制

前言

在上一篇文章,我们整理总结MySQL的常用知识点,在这里,我们尝试学习MySQL复制的相关概念。

什么是MySQL的复制

MySQL的复制允许将主服务器的数据复制到一个或多个从服务器中。默认情况下,复制是异步的。从服务器不需要永久连接就可以从主服务器获取最新的数据。根据配置,可以在数据库中复制所有的数据库,选定数据库,甚至是选定的表。

为什么要实现MySQL复制

MySQL的复制有以下优点:

  1. 在多个从服务器中做负载均衡提升性能
  2. 数据安全性,可以在多个从服务器复制数据,而不会影响主服务器。
  3. 离线数据分析,可以在从服务器上做实时数据分析。
  4. 远程数据分发,为网站创建本地的副本。

MySQL复制方式

MySQL支持不同的复制方式。传统的方方式是基于从主二进制日志记录和位置进行同步。新的基于全局事务标示符GTIDs(Global Transaction Identifiers 是事务性的,因此不需要关注日志文件或位置,极大地简化了许多常见的复制任务。

同步 vs 半同步

MySQL的复制支持不同类型的同步。最初的同步是单向异步复制,也就是说一个服务器充当主服务器,一个或多个服务器充当从服务器。

异步复制的机制是主服务器将事件写入其二进制日志,但不知道从服务器是否或合适检索和处理这些事件。使用异步复制是,如果主服务器出现奔溃,它提交的事务可能不会传输到任何从服务器。因此,在这种情况下,把一个从服务器提升为主服务器,可能会缺少奔溃的主服务器丢失的部分事务。

半同步复制,可用做异步复制的升级版:

  1. 从服务器链接到主服务器时会显示是否具有半同步的能力。
  2. 如果主服务器启用了半同步复制,并且至少有一个半同步从服务器,那么在主服务器上执行事务提交的线程,会等待至少一个从服务器确认接受到事务的全部事件,或者直到超时发生。
  3. 只有将事件写入其中继日志并刷新到磁盘,从服务器才会确认收到事务的事件。
  4. 如果在没有任何从服务器确认事务的情况下发生超时,则主服务器将恢复为异步复制。当至少有一个版同步从服务器赶上时,主服务器将恢复版同步复制。
  5. 必须在主服务器和从服务器都启用版同步复制。如果在主服务器上禁用了版同步复制,或者在主服务器上启用了但没有在从服务器上启用半同步复制,则主服务器使用同步复制。

配置参数

server-id

指定服务器ID.当启用二进制日志记录时,需要为每一个服务器设置一个唯一的服务器ID,范围为1到232 − 1。唯一意味着每个ID必须与任何其他复制主或者从服务器使用的ID不同。

如果设置0 ,则意味着主服务器拒绝从服务器连接,从服务器会拒绝连接到主服务器。

binlog-do-db

设置需要记录二进制日志的数据库。多个数据库,需要设置多行记录,由于MySQL的数据库名称可以包含逗号,因此带有逗号分割列表,被认为是一个数据库。

binlog-ignore-db

设置不需要二进制日记记录的数据库。

日志

MySQL自定义较多的日志概念,这里尝试整理总结下。

binary log

二进制日志包含描述数据库更改,如表创建操作或表数据更改的事件。它包含可能进行更改的语句事件,还包含有关每个语句花费更新数据的时间信息。二进制日志有2个重要的目的:

  1. 对于复制,主复制服务器上的二进制日志提供要发送到从服务器的数据更改记录。
  2. 部分数据恢复操作需要使用二进制日志。

relay log

中继日志由从主服务器的二进制日志读取并由I/O线程写入的事件构成。中继日志中的事件作为SQL线程的一部分在从服务器上执行。

undo log

撤销日志,用于数据一致性读取,或者用于事务的回滚和撤销。存在与撤销表空间,保存由当前事务修改的数据副本的存储区域。如果另外一个事务需要作为一致性读取操作的一部分,则返回该区域检索到的未经修改的数据。

redo log

在服务器奔溃恢复期间使用的一种基于磁盘的数据结构,用于更正由不完全事务写入的数据。在意外之前没有完成的数据文件更新的修改将自动重放。

参考链接

  1. MySQL Replication
  2. MySQL Semisynchronous Replication
  3. Server System Variables
  4. The Binary Log