Skip to content

2.1_MySql Reader

elevenqq edited this page Sep 29, 2018 · 2 revisions

一、使用场景

Mysql-Reader插件用于Mysql数据库增量日志解析和数据模型转换,适用于需要实时同步mysql数据的场景,例如:EDA、CQRS、BigData、数据库升级、迁移与备份,以及业务缓存刷新等。

二、设计原理


如上图所示,Mysql-Reader内部处理流程主要分为两个阶段:

  1. 获取Binlog。MysqlReader集成了阿里开源的Canal组件来采集binlog,CanalServer的生命周期和Reader保持一致,CanalServer启动之后会模拟mysql slave向mysql master发送dump请求,mysql master收到dump请求之后会向canal推送Binlog,Canal内部将binlog事件解析为内部的Entry,MysqlReader通过CanalServer提供的api获取这些Entries。
  2. 数据转换。该阶段主要对Entries进行模型转换,由MessageParser将canal组装好的Entry对象转换为自定义的RdbEventRecord对象。

Mysql-Reader的具体工作原理分如下三个阶段介绍:

  • 【启动canal】
    > build canalFilter
    根据源端要同步的库名和表名生成canal的过滤表达式,过滤不需要同步的库和表。
    > generate CanalInstance
    首先进行组装CanalConfig、创建AlarmHandler、动态生成slaveId等准备工作。其中,组装CanalConfig主要从MysqlReaderParameter的参数配置中获取,包括组装源端数据库、配置起始位点、内存缓冲区大小、HA模式(限定为HEARTBEAT)、心跳检测SQL和频率、黑名单等。
    然后,创建具体的CanalInstance,主要完成以下工作:
    (1)初始化MetaManager,设置位点管理器和canal过滤器;
    (2)启动EventParser,设置支持的Mysql的Binlog类型和格式;
    (3)初始化EventSink,设置数据处理的模式。根据源端master数据库的个数设置EventSink:当master数据库个数>1且GroupSinkMode为Coordinate时,将EventSink初始化为FixedGroupEventSink,支持事务保留,其他情况下将EventSink初始化为EntryEventSink。
    > start canalServer
    启动canal服务器,并从相应位点发起客户端消费订阅,同时订阅filter的变化。这里的MysqlReader相当于一个canal的消费端。
    注: 默认使用MetaManager中记录的最后一次消费位点,若为空,则使用canal实例存储的第一个位点。
  • 【获取数据】
    > get Message withoutAck
    canal服务器在检查确认消费端(此处即为Task)已启动并发起订阅之后,才会拿到该客户端的canal实例进行数据获取。首先判断该实例的上批次数据是否仍存在,若存在,则继续使用上批次events,不再获取新数据,若不存在,则根据上批次的batchId来获取本批次events。然后将获取到的events转换为entrys,并把该批次的batchId和entrys封装为Message返回给Mysql-Reader。随后,Mysql-Reader再对Message数据做具体的处理,包括计算该批次数据对应的binlog日志大小、dump数据详情、将entrys解析为RdbEventRecord等。
    > batchTimeout
    Mysql-Reader利用canal服务器获取数据之前,会首先判断Task是否启动了获取批量数据的超时时间控制(batchTimeout),默认不进行超时控制,当无数据时,进行轮询处理。若进行超时控制,当无数据时,则按照超时时间处理。
    注:轮询处理时,为避免空循环机器挂死,允许最多重试3次,超过3次,最多sleep10毫秒。
  • 【解析数据】
    > do filter
    canal返回的entrys首先需要处理数据过滤,主要包括Transaction Begin/End过滤、回环表以及回环数据过滤、canal心跳表数据过滤。
    > parse Entry
    首先将Entry中的数据转换成RowChange,得到其事件类型EventType,然后根据不同的EventType对Entry进行处理。对于Query事件类型的Entry,直接过滤;对于ddl事件类型的Entry,若Mysql-Reader开启了同步ddl开关,则将其转换为RdbEventRecord对象,若未开启则直接忽略;对于dml事件类型的Entry,可以从rowData的beforeColumns和afterColumns中得到所有主键和非主键列变更前和变更后的数据,并按照EventColumn的index进行排序,最后将其转换为RdbEventRecord对象。

三、功能介绍

  • 【HA机制】

    > canal的ha分为两部分,canal server和canal client分别有对应的ha实现:
    canal server:为了减少对mysql dump的请求,不同server上的instance要求同一时间只能有一个处于running,其他的处于standby状态。
    canal client:为了保证有序性,一份instance同一时间只能由一个canal client进行get/ack/rollback操作,否则客户端接收无法保证有序。

    > mysql的ha通过配置多个读IP实现,当源端数据库出现异常时,能够自动切换到健康的DB

  • 【过滤机制】

    过滤Binlog:过滤不需要同步的数据库、数据表、事务头、事务尾、回环表、心跳表、以及Query类型的数据等,只保留需要同步的Binlog。

    > 过滤事件:可以设置需要过滤的特定事件类型的RecordINSERT/UPDATE/DELETE,可多选)。

  • 【dump数据详情】

    Mysql-Reader拿到canal返回的Message之后,可以设置其进行dump Records的详细信息。由于数据量比较大,默认不进行dunp。

  • 【消费起始时间】

    > Mysql-Reader可以配置消费Binlog的起始时间戳,当Task第一次启动时,若指定了起始时间戳,则从基于指定时间戳最近的时间开始寻找消费位点;若不指定起始时间戳,则默认从源端数据库的当前最新时间开始寻找消费位点

    Task同步过程中,可以重置消费位点,即将消费时间改为过去的某个时间,则Mysql-Reader将从新的时间戳开始寻找消费位点,实现当在数据出现异常时的追数据、补数据的功能。

  • 【支持ddl同步】

    Mysql-Reader可以设置启用自动同步ddl类型的Events,将其转换为RdbEventRecord对象,主要用于往关系型数据库的同步。系统默认开启该功能。

  • 【其他类型数据源支持】
    > 基于canal的设计机制,Mysql-Reader源端支持的数据源类型除了Mysql,还支持MariaDB 5.5.35和10.0.7(理论上可支持以下版本),以及部分oracle版本。

四、插件参数说明

  • 【MysqlReaderParameter】
    在继承Reader插件通用参数基类(PluginReaderParameter,详见深入Task)的基础上,MysqlReaderParameter还根据canal的特点扩展了自己的参数类,用户可以根据需求在页面更改其参数配置。

    MysqlReaderParameter扩展参数 参数描述 默认值 备注
    dumpDetail

    是否需要dump records详情

    false 可以设置dump records详情
    startTimeStamps 起始时间戳 源端数据库当前最新时间 Task第一次启动时,通过该时间戳寻找位点
    fallbackIntervalInSeconds 数据库发生切换查找时回退的时间 60(单位:s) 当源端数据库出现问题发生DB切换时,默认canal回退60s查找位点

    batchTimeout

    获取批量数据的超时时间

    -1L(单位:ms)

    -1代表不进行超时控制,0代表永久,>0则表示按照指定的时间进行控制

    messageBatchSize

    Message订阅批次大小

    2000

    参数调大了可以增大系统的吞吐率,调小了增加同步的实时性
    memoryStorageBufferSize 缓存记录数 32 * 1024(单位:byte) 内存存储的buffer大小
    memoryStorageBufferMemUnit 缓存记录单元大小 1024(单位:byte) 内存存储的buffer单元大小
    detectingSQL 心跳检测sql select 1 Reader插件与源端数据库之间有心跳检测线程
    detectingIntervalInSeconds 心跳检测频率 3(单位:s)
    detectingTimeoutThresholdInSeconds 心跳超时时间 30(单位:s)
    detectingRetryTimes 心跳检测失败重试次数 3
    defaultConnectionTimeoutInSeconds 连接超时时间(sotimeout) 30(单位:s)
    receiveBufferSize mysql连接接收到的BufferSize 64 * 1024(单位:byte)
    sendBufferSize mysql连接发送的BufferSize 64 * 1024(单位:byte)
    GroupSinkMode 分组同步模式下的Event-Sink模式 Coordinate

    Coordinate,//所有eventparser必须相互协同,保证event在时间序列上全局有序,保证所有分库必须都同时正常才进行数据同步(生产环境需用该模式,保证数据全局有序,把所有分库看成一个整体)

    Separate;//eventparser各自进行数据同步,不需要相互协同,event只是局部有序,一个分库出现问题不会影响其它分库数据同步(测试和预生产推荐使用该模式,因为测试和预生产数据量比较小,很容易出现某个分库长时间没数据的情况,这种情况下会出现协调等待,其它分库的数据也无法同步了)

    filteredEventTypes 需要过滤的某些特定类型的事件 INSERT,UPDATE,DELETE(可多选)
    blackFilter 正则表达式匹配表黑名单,忽略解析 .*\\._.* 默认过滤所有以"_"开头的表


  • 【RdbEventRecord】
    Mysql-Reader插件将每条Mysql的变更数据抽象为RdbEventRecord。其主要参数如下:

    RdbEventRecord参数 参数描述 备注

    tableName

    数据表名


    schemaName

    数据库(实例)名称


    EventType

    变更数据的业务类型(I/U/D/C/A/E)


    executeTime

    发生数据变更的业务时间


    oldKeys(List<EventColumn>)

    变更前的主键值

    和oldColumns不同的是,只有主键发生变化时,才需要给oldKeys设置值;

    而oldColumns,不管前后是否发生更新变化,都会赋值

    keys(List<EventColumn>)

    变更后的主键值

    如果是insert/delete,变更前和变更后的主键值是一样的

    oldColumns(List<EventColumn>)

    变更前非主键的其他字段


    columns(List<EventColumn>)

    变更后非主键的其他字段


    sql

    对应的sql语句

    当eventType = CREATE/ALTER/ERASE时,就是对应的sql语句,其他情况为动态生成的INSERT/UPDATE/DELETE sql

    ddlSchemaName

    ddl/query的schemaName

    会存在跨库ddl,需要保留执行ddl的当前schemaName

    hint

    生成对应的hint内容


    RSI Record资源标识符


五、Mapping参数说明

  • 【Mapping相关参数】
    一个Mapping代表了一个同步方向,即要将哪个库的哪张表的哪些字段和数据同步到哪个库的哪张表中。映射关系为一对多,即一张表可以同步到多个目标端数据源。Mysql-Reader插件的相关Mapping配置参数如下:

    Mapping参数 参数描述 备注

    taskId

    所属Task的id

    每个映射均属于一个Task

    sourceMediaId

    源端表的id

    源表名称的模式有:

    SINGLE,//正常单表名称

    MULTI,//支持类似"offer[0000-0031]"的分库分表模式

    WILDCARD,//支持全库同步的通配符

    YEARLY,//支持按年分表的表名模式

    MONTHLY;//支持按月分表的表名模式


六、版本说明

关联技术 稳定版本 待测版本
canal 1.0.24
mysql 5.7及以下

七、注意事项

  • 【变更表结构的一些限制】
    需要明确一点:
        * Binlog对每条日志事件,并没有记录列名,只是按顺序记录了每个列的值
        * com.alibaba.otter.canal.protocol.CanalEntry中的列名、列类型等信息,是canal拿到binlog数据,通过反查元数据表构造出来的
    所以,参与数据同步的表不能进行如下类型的表结构变更,否则Binlog回溯时可能会出现错误

    变更类型 描述
    Table-Rename 表重命名之后,如果进行binlog回溯,重命名之前binlog中保存的表名在数据库当前元数据中查不到
    canal会抛异常
    Column-Rename 列重命名没有binlog回溯的问题
    Column-Rename不支持,是因为脚本检测的时候,涉及到黑白名单的检查,实现起来比较麻烦,暂未实现
    Column-Drop 列删除之后,如果进行binlog回溯,删除前binlog中列的数量和数据库当前元数据中列的数量不一致,
    canal会抛异常,即使采取不抛错的策略,构造出来的【列名和列值映射】也是错误的
    Add-After-Column 增加列的时候,只能增加到末尾
    否则回溯binlog的时候,Add操作之前binlog中列的顺序和数据库当前元数据中列的顺序不一致,
    构造出来的【列名和列值映射】是错误的
    Modify-After-Column 原因同Add-After-Column

    ps:binlog回溯的相关问题可参考canal源码,com.alibaba.otter.canal.parse.inbound.mysql.dbsync.LogEventConvert

  • 【canal说明】

    Mysql-Reader插件依赖于canal提供数据库日志,针对mysql数据有一些要求,具体请查看: https://github.com/alibaba/canal/wiki/QuickStart

    注意:目前canal支持mixed,row,statement多种日志协议的解析,但配合DataLink进行数据库同步,目前仅支持row协议的同步,使用时需要注意。

八、FAQ

canal参考资料:https://github.com/alibaba/canal

Clone this wiki locally