开源数据同步神器——canal

2019/10/29 Canal 共 2345 字,约 7 分钟
山川尽美

本文转自:IT 米粉

前言

如今大型的 IT 系统中,都会使用分布式的方式,同时会有非常多的中间件,如 redis、消息队列、大数据存储等,但是实际核心的数据存储依然是存储在数据库,作为使用最广泛的数据库,如何将 mysql 的数据与中间件的数据进行同步,既能确保数据的一致性、及时性,也能做到代码无侵入的方式呢?如果有这样的一个需求,数据修改后,需要及时的将 mysql 中的数据更新到 elasticsearch,我们会怎么进行实现呢?

数据同步方案选择

针对上文的需求,经过思考,初步有如下的一些方案:

  • 代码实现

    针对代码中进行数据库的增删改操作时,同时进行 elasticsearch 的增删改操作。

  • mybatis 实现

    通过 mybatis plugin 进行实现,截取 sql 语句进行分析, 针对 insert、update、delete 的语句进行处理。显然,这些操作如果都是单条数据的操作,是很容易处理的。但是,实际开发中,总是会有一些批量的更新或者删除操作,这时候,就很难进行处理了。

  • Aop 实现

    不管是通过哪种 Aop 方式,根据制定的规则,如规范方法名,注解等进行切面处理,但依然还是会出现无法处理批量操作数据的问题。

  • logstash

    logstash 类似的同步组件提供的文件和数据同步的功能,可以进行数据的同步,只需要简单的配置就能将 mysql 数据同步到 elasticsearch,但是 logstash 的原理是每秒进行一次增量数据查询,将结果同步到 elasticsearch ,实时性要求特别高的,可能无法满足要求。且此方案的性能不是很好,造成资源的浪费。

那么是否有什么更好的方式进行处理吗?mysql binlog 同步,实时性强,对于应用无任何侵入性,且性能更好,不会造成资源浪费,那么就有了我今天的主角——canal

canal

介绍

canal 是阿里巴巴的一个开源项目,基于 java 实现,整体已经在很多大型的互联网项目生产环境中使用,包括阿里、美团等都有广泛的应用,是一个非常成熟的数据库同步方案,基础的使用只需要进行简单的配置即可。 canal 是通过模拟成为 mysql 的 slave 的方式,监听 mysql 的 binlog 日志来获取数据,binlog 设置为 row 模式以后,不仅能获取到执行的每一个增删改的脚本,同时还能获取到修改前和修改后的数据,基于这个特性,canal 就能高性能的获取到 mysql 数据数据的变更。

使用

canal 的介绍在官网有非常详细的说明,如果想了解更多,大家可以移步官网(https://github.com/alibaba/canal)了解。我这里补充下使用中不太容易理解部分。 canal 的部署主要分为 server 端和 client 端。 server 端部署好以后,可以直接监听 mysql binlog,因为 server 端是把自己模拟成了 mysql slave,所以,只能接受数据,没有进行任何逻辑的处理,具体的逻辑处理,需要 client 端进行处理。 client 端一般是需要大家进行简单的开发。https://github.com/alibaba/canal/wiki/ClientAPI 有一个简单的示例,很容易理解。

canal Adapter

为了便于大家的使用,官方做了一个独立的组件 Adapter,Adapter 是可以将 canal server 端获取的数据转换成几个常用的中间件数据源,现在支持 kafka、rocketmq、hbase、elasticsearch,针对这几个中间件的支持,直接配置即可,无需开发。上文中,如果需要将 mysql 的数据同步到 elasticsearch,直接运行 canal Adapter,修改相关的配置即可。

### 常见问题 - 无法接收到数据,程序也没有报错? 一定要确保 mysql 的 binlog 模式为 row 模式,canal 原理是解析 Binlog 文件,并且直接从文件中获取数据的。

- Adapter 使用无法同步数据? 按照官方文档,检查配置项,如 sql 的大小写,字段的大小写可能都会有影响,如果还无法搞定,可以自己获取代码调试下,Adapter 的代码还是比较容易看懂的。

canal Adapter elasticsearch 改造

因为有了 canal 和 canal Adapter 这个神器,同步到 elasticsearch、hbase 等问题都解决了,但是自己的开发的过程中发现,Adapter 使用还是有些问题,因为先使用的是 elasticsearch 同步功能,所以对 elasticsearch 进行了一些改造:

### elasticsearch 初始化 一个全新的 elasticsearch 无法使用,因为没有创建 elasticsearch index 和 mapping,增加了对应的功能。

elasticsearch 配置文件 mapping 节点增加两个参数:

enablefieldmap: true
  fieldmap:
    id: "text"
    name: "text"
    c_time: "text"

enablefieldmap 是否需要自动生成 fieldmap,默认为 false,如果需要启动的时候就生成这设置为 true,并且设置 fieldmap,类似 elasticsearch mapping 中每个字段的类型。

esconfig bug 处理

代码中获取 binlog 的日志处理时,必须要获取数据库名,但是当获取 binlog 为 type query 时,是无法获取数据库名的,此处有 bug,导致出现 “Outer adapter write failed” ,且未输出错误日志,修复此 bug.

后续计划

增加 rabbit MQ 的支持 增加 redis 的支持

源码

源码地址:https://github.com/itmifen/canal

文档信息

Search

    Table of Contents