经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 程序设计 » Elasticsearch » 查看文章
如何高效实现 MySQL 与 elasticsearch 的数据同步
来源:cnblogs  作者:又拍云  时间:2023/3/17 9:00:50  对本文有异议

MySQL 自身简单、高效、可靠,是又拍云内部使用最广泛的数据库。但是当数据量达到一定程度的时候,对整个 MySQL 的操作会变得非常迟缓。而公司内部 robin/logs 表的数据量已经达到 800w,后续又有全文检索的需求。这个需求直接在 MySQL 上实施是难以做到的。

原数据库的同步问题

由于传统的 mysql 数据库并不擅长海量数据的检索,当数据量到达一定规模时(估算单表两千万左右),查询和插入的耗时会明显增加。同样,当需要对这些数据进行模糊查询或是数据分析时,MySQL作为事务型关系数据库很难提供良好的性能支持。使用适合的数据库来实现模糊查询是解决这个问题的关键。

但是,切换数据库会迎来两个问题,一是已有的服务对现在的 MySQL 重度依赖,二是 MySQL 的事务能力和软件生态仍然不可替代,直接迁移数据库的成本过大。我们综合考虑了下,决定同时使用多个数据库的方案,不同的数据库应用于不同的使用场景。而在支持模糊查询功能的数据库中,elasticsearch 自然是首选的查询数据库。这样后续对业务需求的切换也会非常灵活。

那具体该如何实现呢?在又拍云以往的项目中,也有遇到相似的问题。之前采用的方法是在业务中编写代码,然后同步到 elasticsearch 中。具体是这样实施的:每个系统编写特定的代码,修改 MySQL 数据库后,再将更新的数据直接推送到需要同步的数据库中,或推送到队列由消费程序来写入到数据库中。

但这个方案有一些明显的缺点:

  • 系统高耦合,侵入式代码,使得业务逻辑复杂度增加

  • 方案不通用,每一套同步都需要额外定制,不仅增加业务处理时间,还会提升软件复复杂度

  • 工作量和复杂度增加

在业务中编写同步方案,虽然在项目早期比较方便,但随着数据量和系统的发展壮大,往往最后会成为业务的大痛点。

解决思路及方案

调整架构

既然以往的方案有明显的缺点,那我们如何来解决它呢?优秀的解决方案往往是 “通过架构来解决问题“,那么能不能通过架构的思想来解决问题呢?

答案是可以的。我们可以将程序伪装成 “从数据库”,主库的增量变化会传递到从库,那这个伪装成 “从数据库” 的程序就能实时获取到数据变化,然后将增量的变化推送到消息队列 MQ,后续消费者消耗 MQ 的数据,然后经过处理之后再推送到各自需要的数据库。

这个架构的核心是通过监听 MySQL 的 binlog 来同步增量数据,通过基于 query 的查询旧表来同步旧数据,这就是本文要讲的一种异构数据库同步的实践。

改进数据库

经过深度的调研,成功得到了一套异构数据库同步方案,并且成功将公司生产环境下的 robin/logs 的表同步到了 elasticsearch 上。

首先对 MySQL 开启 binlog,但是由于 maxwell 需要的 binlog_format=row 原本的生产环境的数据库不宜修改。这里请教了海杨前辈,他提供了”从库联级“的思路,在从库中监听 binlog 绕过了操作生产环境重启主库的操作,大大降低了系统风险。

后续操作比较顺利,启动 maxwell 监听从库变化,然后将增量变化推送到 kafka ,最后配置 logstash 消费 kafka中的数据变化事件信息,将结果推送到 elasticsearch。配置 logstash需要结合表结构,这是整套方案实施的重点。

这套方案使用到了kafka、maxwell、logstash、elasticsearch。其中 elasticsearch 与 kafka已经在生产环境中有部署,所以无需单独部署维护。而 logstash 与 maxwell 只需要修改配置文件和启动命令即可快速上线。整套方案的意义不仅在于成本低,而且可以大规模使用,公司内有 MySQL 同步到其它数据库的需求时,都可以上任。

成果展示前后对比

使用该方案同步和业务实现同步的对比

写入到 elasticsearch 性能对比 (8核4G内存)

经过对比测试,800w 数据量全量同步,使用 logstash 写到 elasticsearch,实际需要大概 3 小时,而旧方案的写入时间需要 2.5 天。

方案实施细节

接下来,我们来看看具体是如何实现的。

本方案无需编写额外代码,非侵入式的,实现 MySQL 数据与 elasticsearch 数据库的同步。

下列是本次方案需要使用所有的组件:

  • MySQL

  • Kafka

  • Maxwell(监听 binlog)

  • Logstash(将数据同步给 elasticsearch)

  • Elasticsearch

1. MySQL配置

本次使用 MySQL 5.5 作示范,其他版本的配置可能稍许不同需要

首先我们需要增加一个数据库只读的用户,如果已有的可以跳过。

  1. -- 创建一个 用户名为 maxwell 密码为 xxxxxx 的用户
  2. CREATE USER 'maxwell'@'%' IDENTIFIED BY 'XXXXXX';
  3. GRANT ALL ON maxwell.* TO 'maxwell'@'localhost';
  4. GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO 'maxwell'@'%';

开启数据库的 binlog,修改 mysql 配置文件,注意 maxwell 需要的 binlog 格式必须是row

  1. # /etc/mysql/my.cnf
  2. [mysqld]
  3. # maxwell 需要的 binlog 格式必须是 row
  4. binlog_format=row
  5. # 指定 server_id 此配置关系到主从同步需要按情况设置,
  6. # 由于此mysql没有开启主从同步,这边默认设置为 1
  7. server_id=1
  8. # logbin 输出的文件名, 按需配置
  9. log-bin=master

重启 MySQL 并查看配置是否生效:

  1. sudo systemctl restart mysqld
  1. select @@log_bin;
  2. -- 正确结果是 1
  3. select @@binlog_format;
  4. -- 正确结果是 ROW

如果要监听的数据库开启了主从同步,并且不是主数据库,需要再从数据库开启 binlog 联级同步。

  1. # /etc/my.cnf
  2. log_slave_updates = 1

需要被同步到 elasticsearch 的表结构。

  1. -- robin.logs
  2. show create table robin.logs;
  3. -- 表结构
  4. CREATE TABLE `logs` (
  5. `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
  6. `content` text NOT NULL,
  7. `user_id` int(11) NOT NULL,
  8. `status` enum('SUCCESS','FAILED','PROCESSING') NOT NULL,
  9. `type` varchar(20) DEFAULT '',
  10. `meta` text,
  11. `created_at` bigint(15) NOT NULL,
  12. `idx_host` varchar(255) DEFAULT '',
  13. `idx_domain_id` int(11) unsigned DEFAULT NULL,
  14. `idx_record_value` varchar(255) DEFAULT '',
  15. `idx_record_opt` enum('DELETE','ENABLED','DISABLED') DEFAULT NULL,
  16. `idx_orig_record_value` varchar(255) DEFAULT '',
  17. PRIMARY KEY (`id`),
  18. KEY `created_at` (`created_at`)
  19. ) ENGINE=InnoDB AUTO_INCREMENT=8170697 DEFAULT CHARSET=utf8

2. Maxwell 配置

本次使用 maxwell-1.39.2 作示范, 确保机器中包含 java 环境, 推荐 openjdk11

下载 maxwell 程序

  1. wget https://github.com/zendesk/maxwell/releases/download/v1.39.2/maxwell-1.39.2.tar.gz
  2. tar zxvf maxwell-1.39.2.tar.gz **&&** cd maxwell-1.39.2

maxwell 使用了两个数据库:

  • 一个是需要被监听binlog的数据库(只需要读权限)

  • 另一个是记录maxwell服务状态的数据库,当前这两个数据库可以是同一个

重要参数说明:

  • host 需要监听binlog的数据库地址

  • port 需要监听binlog的数据库端口

  • user 需要监听binlog的数据库用户名

  • password 需要监听binlog的密码

  • replication_host 记录maxwell服务的数据库地址

  • replication_port 记录maxwell服务的数据库端口

  • replication_user 记录maxwell服务的数据库用户名

  • filter 用于监听binlog数据时过滤不需要的数据库数据或指定需要的数据库

  • producer 将监听到的增量变化数据提交给的消费者 (如 stdout、kafka)

  • kafka.bootstrap.servers kafka 服务地址

  • kafka_version kafka 版本

  • kafka_topic 推送到kafka的主题

启动 maxwell

注意,如果 kafka 配置了禁止自动创建主题,需要先自行在 kafka 上创建主题,kafka_version 需要根据情况指定, 此次使用了两张不同的库

  1. ./bin/maxwell
  2. --host=mysql-maxwell.mysql.svc.cluster.fud3
  3. --port=3306
  4. --user=root
  5. --password=password
  6. --replication_host=192.168.5.38
  7. --replication_port=3306
  8. --replication_user=cloner
  9. --replication_password=password
  10. --filter='exclude: *.*, include: robin.logs'
  11. --producer=kafka
  12. --kafka.bootstrap.servers=192.168.30.10:9092
  13. --kafka_topic=maxwell-robinlogs --kafka_version=0.9.0.1

3. 安装 Logstash

Logstash 包中已经包含了 openjdk,无需额外安装。

  1. wget https://artifacts.elastic.co/downloads/logstash/logstash-8.5.0-linux-x86_64.tar.gz
  2. tar zxvf logstash-8.5.0-linux-x86_64.tar.gz

删除不需要的配置文件。

  1. rm config/logstash.yml

修改 logstash 配置文件,此处语法参考官方文档(https://www.elastic.co/guide/en/logstash/current/input-plugins.html)

  1. # config/logstash-sample.conf
  2. input {
  3. kafka {
  4. bootstrap_servers => "192.168.30.10:9092"
  5. group_id => "main"
  6. topics => ["maxwell-robinlogs"]
  7. }
  8. }
  9. filter {
  10. json {
  11. source => "message"
  12. }
  13. # 将maxwell的事件类型转化为es的事件类型
  14. # 如增加 -> index 修改-> update
  15. translate {
  16. source => "[type]"
  17. target => "[action]"
  18. dictionary => {
  19. "insert" => "index"
  20. "bootstrap-insert" => "index"
  21. "update" => "update"
  22. "delete" => "delete"
  23. }
  24. fallback => "unknown"
  25. }
  26. # 过滤无效的数据
  27. if ([action] == "unknown") {
  28. drop {}
  29. }
  30. # 处理数据格式
  31. if [data][idx_host] {
  32. mutate {
  33. add_field => { "idx_host" => "%{[data][idx_host]}" }
  34. }
  35. } else {
  36. mutate {
  37. add_field => { "idx_host" => "" }
  38. }
  39. }
  40. if [data][idx_domain_id] {
  41. mutate {
  42. add_field => { "idx_domain_id" => "%{[data][idx_domain_id]}" }
  43. }
  44. } else {
  45. mutate {
  46. add_field => { "idx_domain_id" => "" }
  47. }
  48. }
  49. if [data][idx_record_value] {
  50. mutate {
  51. add_field => { "idx_record_value" => "%{[data][idx_record_value]}" }
  52. }
  53. } else {
  54. mutate {
  55. add_field => { "idx_record_value" => "" }
  56. }
  57. }
  58. if [data][idx_record_opt] {
  59. mutate {
  60. add_field => { "idx_record_opt" => "%{[data][idx_record_opt]}" }
  61. }
  62. } else {
  63. mutate {
  64. add_field => { "idx_record_opt" => "" }
  65. }
  66. }
  67. if [data][idx_orig_record_value] {
  68. mutate {
  69. add_field => { "idx_orig_record_value" => "%{[data][idx_orig_record_value]}" }
  70. }
  71. } else {
  72. mutate {
  73. add_field => { "idx_orig_record_value" => "" }
  74. }
  75. }
  76. if [data][type] {
  77. mutate {
  78. replace => { "type" => "%{[data][type]}" }
  79. }
  80. } else {
  81. mutate {
  82. replace => { "type" => "" }
  83. }
  84. }
  85. mutate {
  86. add_field => {
  87. "id" => "%{[data][id]}"
  88. "content" => "%{[data][content]}"
  89. "user_id" => "%{[data][user_id]}"
  90. "status" => "%{[data][status]}"
  91. "meta" => "%{[data][meta]}"
  92. "created_at" => "%{[data][created_at]}"
  93. }
  94. remove_field => ["data"]
  95. }
  96. mutate {
  97. convert => {
  98. "id" => "integer"
  99. "user_id" => "integer"
  100. "idx_domain_id" => "integer"
  101. "created_at" => "integer"
  102. }
  103. }
  104. # 只提炼需要的字段
  105. mutate {
  106. remove_field => [
  107. "message",
  108. "original",
  109. "@version",
  110. "@timestamp",
  111. "event",
  112. "database",
  113. "table",
  114. "ts",
  115. "xid",
  116. "commit",
  117. "tags"
  118. ]
  119. }
  120. }
  121. output {
  122. # 结果写到es
  123. elasticsearch {
  124. hosts => ["http://es-zico2.service.upyun:9500"]
  125. index => "robin_logs"
  126. action => "%{action}"
  127. document_id => "%{id}"
  128. document_type => "robin_logs"
  129. }
  130. # 结果打印到标准输出
  131. stdout {
  132. codec => rubydebug
  133. }
  134. }

执行程序:

  1. # 测试配置文件*
  2. bin/logstash -f config/logstash-sample.conf --config.test_and_exit
  3. # 启动*
  4. bin/logstash -f config/logstash-sample.conf --config.reload.automatic

4. 全量同步

完成启动后,后续的增量数据 maxwell 会自动推送给 logstash 最终推送到 elasticsearch ,而之前的旧数据可以通过 maxwell 的 bootstrap 来同步,往下面表中插入一条任务,那么 maxwell 会自动将所有符合条件的 where_clause 的数据推送更新。

  1. INSERT INTO maxwell.bootstrap
  2. ( database_name, table_name, where_clause, client_id )
  3. values
  4. ( 'robin', 'logs', 'id > 1', 'maxwell' );

后续可以在 elasticsearch 检测数据是否同步完成,可以先查看数量是否一致,然后抽样对比详细数据。

  1. # 检测 elasticsearch 中的数据量
  2. GET robin_logs/robin_logs/_count

原文链接:https://www.cnblogs.com/upyun/p/17217520.html

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号