跳到主要内容

Kafka 复制到 ClickHouse

NineData 数据复制支持将 Kafka 中的数据复制到 ClickHouse,实现数据的实时集成。

背景信息

Kafka 是一个分布式消息队列,具有高吞吐量、低延迟、高可靠性和可扩展性等特点,适用于实时数据处理和分发的场景,同时也成为上游数据仓库的核心数据来源。

而 ClickHouse 是一个列式数据库管理系统(DBMS),通过将 Kafka 的数据实时集成到 ClickHouse,可以有效地处理和分析大量的实时数据。

功能介绍

NineData 数据复制支持数据源之间的结构、全量数据、增量数据的高性能复制,对于 MySQL 数据源之间的复制,还提供双向复制功能,实现快捷构建异地多活业务架构。

  • 结构:支持同构及异构数据源之间的对象结构复制,很大程度上降低了两个数据源之间的数据复制门槛。
  • 全量数据:通过智能数据分片实现行级并发批量复制能力,有效保障复制性能。自主研发的新型断点续传技术,保证无主键表的数据准确性。
  • 增量数据:支持全对象类型的 DML|DDL 增量数据复制,结合事务级并发、热点合并等技术,在保障事务一致性的基础上,提供强劲复制性能。
  • 双向数据实时复制(仅 MySQL 之间):直接多个节点之间的数据双向复制,保证所有节点的数据均保持最新状态。

通过以上功能,可以轻松高效地实现全量|增量数据复制全量|增量数据迁移全量|增量数据同步数据集成不停机无缝迁移等场景,为企业提供灵活和可靠的数据复制解决方案。

前提条件

  • 已将源和目标数据源添加至 NineData。如何添加,请参见创建数据源
  • 源数据源为 Kafka 0.10 及以上版本。
  • 目标数据源为 ClickHouse 20.3 及以上版本。
  • 基于 Kafka JSON 对象中各字段的名称,已在目标端的 ClickHouse 上创建相对应的库、表、列。

操作步骤

  1. 登录 NineData 控制台

  2. 在左侧导航栏单击

  3. 页面中单击

  4. 页签,按照下表进行配置,并单击

    参数
    说明
    输入数据同步任务的名称,为了方便后续查找和管理,请尽量使用有意义的名称。最多支持 64 个字符。
    同步对象所在的数据源,此处选择需复制数据所在的 Kafka 数据源。
    接收同步对象的数据源,此处选择目标 ClickHouse 数据源。
    选择需要复制的 Kafka Topic(主题)。
    Message Format选择数据将以什么格式复制到 ClickHouse,当前仅支持 Canal-JSON
    • :从 Kafka Topic 中寻找最早的一条消息,并按顺序复制到 ClickHouse。
    • :忽略 Kafka Topic 中已存在的数据,仅复制任务开始之后新产生的消息。
    • :自定义 Kafka Topic 中的消息位置,即 Offset 编号。系统将从该 Offset 编号开始按顺序复制消息。
    • :预检查阶段检测到目标表中存在数据时,停止任务。
    • :预检查阶段检测到目标表中存在数据时,忽略该部分数据,追加写入其他数据。
    • :预检查阶段检测到目标表中存在数据时,删除该部分数据,重新写入。
  5. 页签,配置下列参数,然后单击

    参数
    说明
    选择 Kafka 数据复制到 ClickHouse 之后的存储位置,您可以选择包含 ClickHouse 的所有库,也可以选择列表中选中需要存储 Kafka 数据的位置,然后单击>添加到右侧列表。
  6. 页签,可以单独配置 Kafka 消息中的键值对与每个 ClickHouse 库表列之间的映射关系,只要符合您配置条件的 Kafka 消息,都会保存在 ClickHouse 中对应的位置。更多信息,请参见附录

    提示

    Message FormatCanal-JSON 的情况下,您需要为每个 ClickHouse 的库和表配置 Kafka 中 $.database$.table 字段的映射。例如,将 $.database 配置为 ninedata_m2c$.table 配置为 sbtest1,则 Kafka 中符合该条件的所有消息均会复制到 ClickHouse 目标库表的对应列中。

    key_value_config

  7. 页签,等待系统完成预检查,预检查通过后,单击

    提示
    • 如果预检查未通过,需要单击目标检查项右侧列的,排查失败的原因,手动修复后单击重新执行预检查,直到通过。

    • 的检查项,可视具体情况修复或忽略。
  8. 页面,提示,同步任务开始运行。此时您可以进行如下操作:

    • 单击可以查看同步任务各个阶段的执行情况。
    • 单击可以返回任务列表页面。

查看同步结果

  1. 登录 NineData 控制台

  2. 在左侧导航栏单击

  3. 页面单击目标同步任务的 ID,打开页面,页面说明如下。

    kafka2mysql_result

    序号
    功能
    说明
    1同步延迟源数据源和目标数据源之间的数据同步延迟,0 秒表示两端之间没有延迟,即表示 Kafka 中目前没有消费数据。
    2配置告警配置告警后,系统会在任务失败时通过您选择的方式通知您。更多信息,请参见运维监控简介
    3更多
    • :暂停任务,仅状态为的任务可选。
    • :结束未完成或监听中(即增量同步中)的任务,终止任务后无法重启任务,请谨慎操作。
    • :删除任务,任务删除后无法恢复,请谨慎操作。
    4增量复制展示增量复制的各项监控指标。
    • 单击页面右侧的:查看增量复制的执行日志。
    • 单击页面右侧的refresh图标:查看最新的信息。
    5修改对象展示同步对象的修改记录。
    • 单击页面右侧的,可对同步对象进行配置
    • 单击页面右侧的refresh图标:查看最新的信息。
    6更多展示当前复制任务的详细信息,包括正在同步中的 Kafka TopicMessage Format等。

附录:JSON_PATH 的定义

Kafka 迁移到 ClickHouse 是通过提取 Kafka 中 JSON 对象中的数据并转存到 ClickHouse 中目标库表的方式来实现的,而 JSON_PATH 是用于定位 Kafka JSON 对象中特定元素的一种路径表达式,本章节介绍 JSON_PATH 的定义。

JSON_PATH 由 ScopepathExpression 两个主要部分组成:

  • Scope:由美元符号($)表示,代表 JSON 的根。
  • pathExpression:用于匹配 JSON 对象中特定元素的路径,由一个或多个 pathLeg 组成,这些 pathLeg 可以是 memberarrayLocationdoubleAsterisk,多个 pathLeg 代表多个层级。
    • member:匹配 JSON 对象中的 Key。格式为.<Key 名称>.*,前者将在 JSON 对象中基于指定的 Key 名称查找;后者表示 JSON 对象中的所有 Key。
    • arrayLocation:匹配 JSON 对象中的数组元素。格式为[<非负整数>][*],前者将在 JSON 对象中基于指定的非负整数定位数组元素;后者表示 JSON 对象中的所有数组元素。
    • doubleAsterisk:匹配任意层级的 Key 或数组位置。格式为.**

示例:以下是一段 Kafka 中的 JSON 对象示例,通过 JSON_PATH 进行匹配。

{
"data": [{
"id": "2",
"name": "John",
"address": "2"
}],
"database": "9zcloud",
"es": 1693809702000,
"id": 63,
"isDdl": false,
"mysqlType": {
"id": "int",
"name": "varchar(20)",
"address": "varchar(50)"
},
"old": null,
"pkNames": ["id"],
"sql": "",
"sqlType": {
"id": 4,
"name": 12,
"address": -5
},
"table": "t",
"ts": 1693809701769,
"type": "INSERT"
}
  • $.data:匹配名为 data 的 JSON 对象。
  • $.data.id:匹配 data JSON 对象下名为 id 的 Key。
  • $.**:匹配 JSON 对象中任意层级的任意 Key。

附录 2:预检查项一览表

检查项检查内容
目标库数据存在性检查检查待复制对象在目标数据库中是否已存在数据
源数据源连接检查检查源数据源网关状态、实例是否可达、用户名及密码准确性
目标数据源连接检查检查目标数据源网关状态、实例是否可达、用户名及密码准确性
目标库权限检查检查目标数据库的账号权限是否满足要求
检查目标表是否包含系统字段检查目标表是否包含系统字段 _sign 、 _version
目标库同名对象存在性检查检查待复制对象在目标数据库中是否已存在

相关文档

数据复制简介