PostgreSQL 迁移同步到 Kafka
NineData 数据复制支持将 PostgreSQL 中的数据复制到 Kafka,企业可以通过该方法实现高效的数据迁移与同步,确保 Kafka 系统中的数据一直处于最新状态。
背景信息
Kafka 作为一个分布式流媒体平台,广泛应用于实时数据传输与处理。PostgreSQL 是流行的关系型数据库管理系统,常用于存储结构化数据。在许多应用场景中,将 PostgreSQL 的数据实时同步到 Kafka,可以提高数据的处理速度与可扩展性,为企业的数据分析、日志处理等提供支持。
前提条件
已将源和目标数据源添加至 NineData。如何添加,请参见创建数据源。
目标的数据库类型为 Kafka 0.10 及以上版本。
您必须拥有源数据源的如下权限:
复制类型 权限 全量复制 CONNECT、SELECT 增量复制 SUPERUSER 如需增量复制,请打开
postgresql.conf
文件配置如下参数,如果找不到该文件的位置,可以在 psql 客户端中执行SHOW config_file;
SQL 命令查看。- 源数据源的
wal_level
参数必须为logical
,如需确认当前值,可以直接在客户端中执行SHOW wal_level
查看。 - 源数据源的
wal_sender_timeout
参数设置为0
。此参数用于中断处于停滞状态超过指定毫秒数的复制连接,默认值为 60000 毫秒。设置为 0 将将禁用超时机制。如需确认当前值,可以直接在客户端中执行SHOW wal_sender_timeout
查看。 - 源数据源的
max_replication_slots
参数必须大于1
。此参数指定服务器可以支持的最大复制槽数。默认值为10。 - 源数据源的
max_wal_senders
参数必须大于1
。此参数指定最大的并发连接数。默认值为10。如需确认当前值,可以直接在客户端中执行SHOW max_wal_senders
查看。
- 源数据源的
使用限制
- 执行数据同步前需评估源数据源和目标数据源的性能,同时建议业务低峰期执行数据同步。否则全量数据初始化时将占用源数据源和目标数据源一定的读写资源,导致数据库负载上升。
- 需要确保同步对象中的每张表都有主键或唯一约束、列名具有唯一性,否则可能会重复同步相同数据。
操作步骤
NineData 数据复制产品已商业化,您仍然可以保有 10 条复制任务免费使用,注意事项如下:
- 10 条复制任务中可以包含 1 条任务,规格为 Micro。
- 状态为的任务不算在 10 条任务的限制内,如果您已经创建了 10 条复制任务,还想要继续创建,可以先终止之前的复制任务,然后再创建新任务。
- 创建复制任务时,仅可选择您已购买的,未购买的规格将以灰度显示,无法选择。如需购买,请通过页面右下角的客服图标联系我们。
登录 NineData 控制台。
在左侧导航栏单击 > 。
在页面,单击右上角的。
在页签,按照下表进行配置,并单击。
参数 说明 输入数据同步任务的名称,为了方便后续查找和管理,请尽量使用有意义的名称。最多支持 64 个字符。 同步对象所在的数据源,此处选择需复制数据所在的数据源。 接收同步对象的数据源,此处选择目标 Kafka 数据源。 选择目标 Kafka Topic(主题),源数据源中的数据将写入到该指定 Topic 中。 将数据投递到一个 Topic 中时,可以指定这些数据要投递到 Topic 中的哪个分区。 - :将所有数据投递到默认的分区 0。
- :将数据散列分配到不同的分区中,系统会使用库名和表名的哈希值来计算目标数据投递到哪个分区,以确保散列投递过程中,同一个表中的数据被投递到同一分区。
支持和两种。 - :复制源数据源的所有对象和数据,即全量数据复制。右侧的开关为周期性全量复制的开关,更多信息,请参见周期性全量复制。
- :在全量复制完成后,基于源数据源的日志进行增量复制。
(仅的情况下不可选) 仅在任务包含或的情况下可选。
复制任务的规格,规格越大复制的速率越高,将鼠标悬浮于图标上即可查看每个规格对应的速率和配置信息。每个规格中显示了可用数量及规格总数,当可用数量为 0 时将以灰度显示,无法选择。
仅为时需要选择。 - :以当前复制任务开始时间为基准,进行增量复制。
- :选择增量复制开始的时间点,您可以根据您的业务所在地域按需选择时区。如果将时间点配置为当前复制任务开始前,则如果该时间段内有 DDL 操作,复制任务将失败。
- :预检查阶段检测到目标表中存在数据时,停止任务。
- :预检查阶段检测到目标表中存在数据时,忽略该部分数据,追加写入其他数据。
- :预检查阶段检测到目标表中存在数据时,删除该部分数据,重新写入。
在页签,配置下列参数,然后单击。
参数 说明 选择需要复制的内容,您可以选择复制源库所有内容,也可以选择,在列表中选中需要复制的内容,单击>添加到右侧列表。 如果您需要创建多条相同复制对象的复制链路,可以创建一个配置文件,在新建任务的时候导入即可。单击右上角的,再单击下载模板,将配置文件模版下载到本地,编辑完成后单击上传该配置文件即可实现批量导入。配置文件说明:
参数 说明 source_table_name
需同步的对象所在的源表名。 destination_table_name
接收同步对象的目标表名。 source_schema_name
需同步的对象所在的源 Schema 名。 destination_schema_name
接收同步对象的目标 Schema 名。 source_database_name
需同步的对象所在的源库名。 target_database_name
接收同步对象的目标库名。 column_list
需要同步的字段列表。 extra_configuration
额外的配置信息,您可以在这里配置如下信息: - 字段映射:
column_name
、destination_column_name
- 字段取值:
column_value
- 数据过滤:
filter_condition
提示extra_configuration
的示例内容如下:{
"column_name": "created_time", //指定需要执行列名映射的原列名
"destination_column_name": "migrated_time", //目标列名映射为 "migrated_time"
"column_value": "current_timestamp()", //将列的字段取值更改为当前时间戳
"filter_condition": "id != 0" //ID 不为 0 的行才会同步。
}配置文件的整体示例内容请参见下载的模版。
- 字段映射:
在页签,可以单独配置需要复制到 Kafka 的每个列,默认情况下将复制所选表的全部列。如果在配置映射阶段,源和目标数据源中有更新,可以单击页面右上角的按钮,重新获取源和目标数据源的信息。配置完成后,单击。
在页签,等待系统完成预检查,预检查通过后,单击。
提示如果预检查未通过,需要单击目标检查项右侧列的,排查失败的原因,手动修复后单击重新执行预检查,直到通过。
- 为的检查项,可视具体情况修复或忽略。
在页面,提示,同步任务开始运行。此时您可以进行如下操作:
- 单击可以查看同步任务各个阶段的执行情况。
- 单击可以返回任务列表页面。
查看同步结果
登录 NineData 控制台。
在左侧导航栏单击 > 。
在页面单击目标同步任务的 ID,打开页面,页面说明如下。
序号 功能 说明 1 同步延迟 源数据源和目标数据源之间的数据同步延迟,0 秒表示两端之间没有延迟,即 Kafka 端的数据目前已追平源端。 2 配置告警 配置告警后,系统会在任务失败时通过您选择的方式通知您。更多信息,请参见运维监控简介。 3 更多 - :暂停任务,仅状态为的任务可选。
- :结束未完成或监听中(即增量同步中)的任务,终止任务后无法重启任务,请谨慎操作。
- :删除任务,任务删除后无法恢复,请谨慎操作。
4 全量复制(包含全量复制的场景下显示) 展示全量复制的进度和详细信息。 - 单击页面右侧的:查看全量复制过程中的各监控指标。全量复制过程中,还可以单击监控指标页面右侧的,限制每秒写入到目标数据源的速率。单位为 MB/S。
- 单击页面右侧的:查看全量复制的执行日志。
- 单击页面右侧的图标:查看最新的信息。
5 增量复制(包含增量复制的场景下显示) 展示增量复制的各项监控指标。 - 单击页面右侧的:限制每秒写入到目标数据源的速率。单位为行/秒。
- 单击页面右侧的:查看增量复制的执行日志。
- 单击页面右侧的图标:查看最新的信息。
6 修改对象 展示同步对象的修改记录。 - 单击页面右侧的,可对同步对象进行配置。
- 单击页面右侧的图标:查看最新的信息。
7 展开 展示当前复制任务的详细信息,包括、、等。
附录 1:数据格式说明
PostgreSQL 迁移到 Kafka 的数据将以 JSON 格式存储,系统会均等划分 PostgreSQL 中的数据存到多个 JSON 对象中,每个 JSON 代表一个消息(Message)。
- 全量复制阶段:单个消息存储的 PostgreSQL 数据行数由 message.max.bytesmessage.max.bytes 是 Kafka 集群中允许的最大消息大小,默认值:
1000000
字节(即 1MB)。您可以通过修改 Kafka 配置文件中的message.max.bytes
参数来调整该值,这样每个消息中将会存储更多的 PostgreSQL 数据行数。请注意,由于 Kafka 需要为每个消息分配内存空间,因此该值如果设置得过大,可能导致 Kafka 集群性能下降。 参数决定。 - 增量复制阶段:单个消息存储一行 PostgreSQL 数据。
字段 | 字段类型 | 字段描述 | 字段示例 |
---|---|---|---|
serverId | STRING | 消息归属的数据源信息,格式:<连接地址:端口> 。 | "serverId":"47.98.224.21:3307" |
id | LONG | 消息的 Record id。本字段全局递增,用作消息重复消费的判断依据。 | "Id":156 |
es | INT | 以 Unix 时间戳表示,不同的任务阶段代表不同的含义:
| "es":1668650385 |
ts | INT | 更多数据被投递到 Kafka 的时间,以 Unix 时间戳表示。 | "ts":1668651053 |
isDdl | BOOLEAN | 数据是否为 DDL,取值:
| "is_ddl":true |
type | STRING | 数据的类型,取值:
| "type":"INIT" |
database | STRING | 数据所属的数据库。 | "database":"database_name" |
table | STRING | 数据所属的表。如果 DDL 语句对应的对象为非表,则该字段取值为 null 。 | "table":"table_name" |
mysqlType | JSON | 预留字段,无需关注。 | "mysqlType": null |
sqlType | JSON | 数据在源 PostgreSQL 中时,各字段的数据类型。 | "sqlType": {"id": "NUMBER","shipping_type":"varchar2(50)"} |
pkNames | ARRAY | Binlog 中的记录(Record)所对应的主键(Primary Key)名称。取值:
| "pkNames": ["id", "uid"] |
data | ARRAY[JSON] | PostgreSQL 投递到 Kafka 的数据,存储在 JSON 格式的数组中。
| "data": [{ "name": "jl", "phone": "(737)1234787", "email": "caicai@yahoo.edu", "address": "zhejiang", "country": "china" }] |
old | ARRAY[JSON] | 记录 PostgreSQL 到 Kafka 的增量复制详情。
除此之外的其他操作,该字段的值均为 null 。 | "old": [{ "name": "someone", "phone": "(737)1234787", "email": "someone@example.com", "address": "somewhere", "country": "china" }] |
sql | STRING | 如果当前数据是增量的 DDL 操作,则记录该操作对应的 SQL 语句。除此之外的其他操作,该字段的值均为 null 。 | "sql":"create table sbtest1(id int primary key,name varchar(20))" |
附录 2:预检查项一览表
检查项 | 检查内容 |
---|---|
源数据源连接检查 | 检查源数据源网关状态、实例是否可达、用户名及密码准确性 |
目标数据源连接检查 | 检查目标数据源网关状态、实例是否可达、用户名及密码准确性 |
源库权限检查 | 检查源数据库的账号权限是否满足要求 |
目标库权限检查 | 检查 Kafka 账号是否对 Topic 有访问权限 |
目标库数据存在性检查 | 检查 Topic 中是否已存在数据 |
检查 wal_level | 检查源数据源的 wal_level 是否为 logical |
检查 max_wal_senders | 检查 max_wal_senders 是否满足复制连接数要求 |
检查 max_replication_slots | 检查 max_replication_slots 是否满足复制槽数量要求 |