跳到主要内容

MySQL 迁移同步到 Kafka

NineData 数据复制支持将 MySQL 中的数据复制到 Kafka,实现数据的实时流转和分发。

背景信息

Kafka 是一个分布式消息队列,具有高吞吐量、低延迟、高可靠性和可扩展性等特点,适用于实时数据处理和分发的场景。将 MySQL 中的数据复制到 Kafka 可以构建实时数据流处理系统,满足实时数据处理、数据分析、数据挖掘、业务监控等需求。此外,将 MySQL 中的数据复制到 Kafka 中还可以实现数据的解耦,从而减少系统间的依赖和耦合,使系统更易于维护扩展。

MySQL 中的数据复制到 Kafka 之后会以 JSON 数组的形式存储,JSON 数组中包含的字段详情请参见附录

功能介绍

NineData 数据复制支持数据源之间的结构、全量数据、增量数据的高性能复制,对于部分数据源,还提供双向复制功能,实现快捷构建异地多活业务架构。

  • 结构:支持同构及异构数据源之间的对象结构复制,很大程度上降低了两个数据源之间的数据复制门槛。
  • 全量数据:通过智能数据分片实现行级并发批量复制能力,有效保障复制性能。自主研发的新型断点续传技术,保证无主键表的数据准确性。
  • 增量数据:支持全对象类型的 DML|DDL 增量数据复制,结合行级并发、热点合并等技术,提供强劲复制性能。
  • 双向数据实时复制:直接多个节点之间的数据双向复制,保证所有节点的数据均保持最新状态。

通过以上功能,可以轻松高效地实现全量|增量数据复制全量|增量数据迁移全量|增量数据同步数据集成不停机无缝迁移等场景,为企业提供灵活和可靠的数据复制解决方案。

前提条件

  • 已将源和目标数据源添加至 NineData。如何添加,请参见创建数据源

  • 源的数据库类型为 MySQL 或类 MySQL 数据库,例如 MySQL、MariaDB、PolarDB MySQL、TDSQL-C、GaussDB MySQL 等。

  • 目标的数据库类型为 Kafka 0.10 及以上版本。

  • 源数据源必须开启 Binlog,并且 Binlog 相关参数设置如下:

    • binlog_format=ROW
    • binlog_row_image=FULL
    提示

    如果源数据源为备库,为保证获取完整的 Binlog 日志,还需要开启 log_slave_updates 参数。

使用限制

  • 数据复制功能仅针对数据源中的用户数据库,系统库不会被复制。例如:MySQL 类型数据源中的 information_schemamysqlperformance_schemasys 库不会被复制。
  • 执行数据同步前需评估源数据源和目标数据源的性能,同时建议业务低峰期执行数据同步。否则全量数据初始化时将占用源数据源和目标数据源一定的读写资源,导致数据库负载上升。
  • 需要确保同步对象中的每张表都有主键或唯一约束、列名具有唯一性,否则可能会重复同步相同数据。

操作步骤

商业化通知

NineData 数据复制产品已商业化,您仍然可以保有 10 条复制任务免费使用,注意事项如下:

  • 10 条复制任务中可以包含 1 条任务,规格为 Micro
  • 状态为的任务不算在 10 条任务的限制内,如果您已经创建了 10 条复制任务,还想要继续创建,可以先终止之前的复制任务,然后再创建新任务。
  • 创建复制任务时,仅可选择您已购买的,未购买的规格将以灰度显示,无法选择。如需购买,请通过页面右下角的客服图标联系我们。
  1. 登录 NineData 控制台

  2. 在左侧导航栏单击 >

  3. 页面,单击右上角的

  4. 页签,按照下表进行配置,并单击

    参数
    说明
    输入数据同步任务的名称,为了方便后续查找和管理,请尽量使用有意义的名称。最多支持 64 个字符。
    同步对象所在的数据源,此处选择需复制数据所在的数据源。
    接收同步对象的数据源,此处选择目标 Kafka 数据源。
    选择目标 Kafka Topic(主题),源数据源中的数据将写入到该指定 Topic 中。
    将数据投递到一个 Topic 中时,可以指定这些数据要投递到 Topic 中的哪个分区。
    • :将所有数据投递到默认的分区 0。
    • :将数据散列分配到不同的分区中,系统会使用库名和表名的哈希值来计算目标数据投递到哪个分区,以确保散列投递过程中,同一个表中的数据被投递到同一分区。
    支持两种
    • :复制源数据源的所有对象和数据,即全量数据复制。右侧的开关为周期性全量复制的开关,更多信息,请参见周期性全量复制
    • :在全量复制完成后,基于源数据源的日志进行增量复制。
    (仅的情况下不可选)

    仅在任务包含的情况下可选。

    复制任务的规格,规格越大复制的速率越高,将鼠标悬浮于details图标上即可查看每个规格对应的速率和配置信息。每个规格中显示了可用数量及规格总数,当可用数量为 0 时将以灰度显示,无法选择。

    仅为时需要选择。
    • :以当前复制任务开始时间为基准,进行增量复制。
    • :选择增量复制开始的时间点,您可以根据您的业务所在地域按需选择时区。如果将时间点配置为当前复制任务开始前,则如果该时间段内有 DDL 操作,复制任务将失败。
    • :预检查阶段检测到目标表中存在数据时,停止任务。
    • :预检查阶段检测到目标表中存在数据时,忽略该部分数据,追加写入其他数据。
    • :预检查阶段检测到目标表中存在数据时,删除该部分数据,重新写入。
  5. 页签,配置下列参数,然后单击

    参数
    说明
    选择需要复制的内容,您可以选择复制源库所有内容,也可以选择,在列表中选中需要复制的内容,单击>添加到右侧列表。

    如果您需要创建多条相同复制对象的复制链路,可以创建一个配置文件,在新建任务的时候导入即可。单击右上角的,再单击下载模板,将配置文件模版下载到本地,编辑完成后单击上传该配置文件即可实现批量导入。配置文件说明:

    参数
    说明
    source_table_name需同步的对象所在的源表名。
    destination_table_name接收同步对象的目标表名。
    source_schema_name需同步的对象所在的源 Schema 名。
    destination_schema_name接收同步对象的目标 Schema 名。
    source_database_name需同步的对象所在的源库名。
    target_database_name接收同步对象的目标库名。
    column_list需要同步的字段列表。
    extra_configuration额外的配置信息,您可以在这里配置如下信息:
    • 字段映射:column_namedestination_column_name
    • 字段取值:column_value
    • 数据过滤:filter_condition
    提示
    • extra_configuration 的示例内容如下:

      {
      "column_name": "created_time", //指定需要执行列名映射的原列名
      "destination_column_name": "migrated_time", //目标列名映射为 "migrated_time"
      "column_value": "current_timestamp()", //将列的字段取值更改为当前时间戳
      "filter_condition": "id != 0" //ID 不为 0 的行才会同步。
      }
    • 配置文件的整体示例内容请参见下载的模版。

  6. 页签,可以单独配置需要复制到 Kafka 的每个列,默认情况下将复制所选表的全部列。如果在配置映射阶段,源和目标数据源中有更新,可以单击页面右上角的按钮,重新获取源和目标数据源的信息。配置完成后,单击

  7. 页签,等待系统完成预检查,预检查通过后,单击

    提示
    • 如果预检查未通过,需要单击目标检查项右侧列的,排查失败的原因,手动修复后单击重新执行预检查,直到通过。

    • 的检查项,可视具体情况修复或忽略。
  8. 页面,提示,同步任务开始运行。此时您可以进行如下操作:

    • 单击可以查看同步任务各个阶段的执行情况。
    • 单击可以返回任务列表页面。

查看同步结果

  1. 登录 NineData 控制台

  2. 在左侧导航栏单击 >

  3. 页面单击目标同步任务的 ID,打开页面,页面说明如下。

    result_kafka

    序号
    功能
    说明
    1同步延迟源数据源和目标数据源之间的数据同步延迟,0 秒表示两端之间没有延迟,即 Kafka 端的数据目前已追平源端。
    2配置告警配置告警后,系统会在任务失败时通过您选择的方式通知您。更多信息,请参见运维监控简介
    3更多
    • :暂停任务,仅状态为的任务可选。
    • :结束未完成或监听中(即增量同步中)的任务,终止任务后无法重启任务,请谨慎操作。
    • :删除任务,任务删除后无法恢复,请谨慎操作。
    4全量复制(包含全量复制的场景下显示)展示全量复制的进度和详细信息。
    • 单击页面右侧的:查看全量复制过程中的各监控指标。全量复制过程中,还可以单击监控指标页面右侧的,限制每秒写入到目标数据源的速率。单位为 MB/S。
    • 单击页面右侧的:查看全量复制的执行日志。
    • 单击页面右侧的refresh图标:查看最新的信息。
    5增量复制(包含增量复制的场景下显示)展示增量复制的各项监控指标。
    • 单击页面右侧的:限制每秒写入到目标数据源的速率。单位为行/秒。
    • 单击页面右侧的:查看增量复制的执行日志。
    • 单击页面右侧的refresh图标:查看最新的信息。
    6修改对象展示同步对象的修改记录。
    • 单击页面右侧的,可对同步对象进行配置
    • 单击页面右侧的refresh图标:查看最新的信息。
    7展开展示当前复制任务的详细信息,包括等。

附录 1:数据格式说明

MySQL 迁移到 Kafka 的数据将以 JSON 格式存储,系统会均等划分 MySQL 中的数据存到多个 JSON 对象中,每个 JSON 代表一个消息(Message)。

每个 JSON 对象中包含了如下字段:

字段字段类型字段描述字段示例
serverIdSTRING消息归属的数据源信息,格式:<连接地址:端口>。"serverId":"47.98.224.21:3307"
idLONG消息的 Record id。本字段全局递增,用作消息重复消费的判断依据。"Id":156
esINT不同的任务阶段代表不同的含义:
  • 全量复制阶段:表示全量数据复制任务的开始时间,以 Unix 时间戳表示。
  • 增量复制阶段:表示 Binlog 中每个事件(EVENT)对应的时间。
"es":1668650385
tsINT更多数据被投递到 Kafka 的时间,以 Unix 时间戳表示。"ts":1668651053
isDdlBOOLEAN数据是否为 DDL,取值:
  • true:是
  • false:否
"is_ddl":true
typeSTRING数据的类型,取值:
  • INIT:表示全量数据复制。
  • INSERT:表示 INSERT 操作。
  • DELETE:表示 DELETE 操作。
  • UPDATE:表示 UPDATE 操作。
  • DDL:表示 DDL 操作。
"type":"INIT"
databaseSTRING数据所属的数据库。"database":"database_name"
tableSTRING数据所属的表。如果 DDL 语句对应的对象为非表,则该字段取值为 null"table":"table_name"
mysqlTypeJSON记录数据在 MySQL 中的数据类型,以 JSON 数组表示。"mysqlType": {"id": "bigint(20)", "shipping_type": "varchar(50)" }
sqlTypeJSON预留字段,无需关注。"sqlType":null
pkNamesARRAYBinlog 中的记录(Record)所对应的主键(Primary Key)名称。取值:
  • 如果该 Record 是 DDL 类型,取值为 null
  • 如果该 Record 是 INIT 或 DML 类型,取值为该 Record 的主键名称。
"pkNames": ["id", "uid"]
dataARRAY[JSON]MySQL 投递到 Kafka 的数据,存储在 JSON 格式的数组中。
  • 全量数据复制场景(type = INIT):存储了 MySQL 投递到 Kafka 的全量数据。
  • 增量数据复制场景:存储了 Binlog 中对于数据所做的变更详情。
    • INSERT:各字段中插入操作的值。
    • UPDATE:各字段中更新操作(更新后)的值。
    • DELETE:各字段中删除操作的值。
    • DDL:对表 DDL 操作之后的表结构。
"old": [{ "name": "someone", "phone": "(737)1234787", "email": "someone@example.com", "address": "somewhere", "country": "china" }]
oldARRAY[JSON]记录 MySQL 到 Kafka 的增量复制详情。
  • UPDATE:各字段中更新操作之前的值。
  • DDL:对表 DDL 操作之前的表结构。

除此之外的其他操作,该字段的值均为 null
"old": [{ "name": "someone", "phone": "(737)1234787", "email": "someone@example.com", "address": "somewhere", "country": "china" }]
sqlSTRING如果当前数据是增量的 DDL 操作,则记录该操作对应的 SQL 语句。除此之外的其他操作,该字段的值均为 null"sql":"create table sbtest1(id int primary key,name varchar(20))"

附录 2:预检查项一览表

检查项检查内容
源数据源连接检查检查源数据源网关状态、实例是否可达、用户名及密码准确性
目标数据源连接检查检查目标数据源网关状态、实例是否可达、用户名及密码准确性
源库权限检查检查源数据库的账号权限是否满足要求
源数据库 log_slave_updates 是否支持当源数据库为从库时,检查 log_slave_updates 是否为 ON
源数据源和目标数据源版本检查检测源数据库与目标数据库的版本是否兼容
源数据库是否开启 Binlog检查源数据库是否开启 Binlog
源数据库 Binlog Format 是否支持检查源数据库的 binlog 格式是否为 'ROW'
源数据库 binlog_row_image 是否支持检查源数据库的 binlog_row_image 是否为 'FULL'
目标库权限检查检查 Kafka 账号是否对 Topic 有访问权限
目标库数据存在性检查检查 Topic 中是否已存在数据

相关文档

数据复制简介