Skip to main content

PostgreSQL Migration Synchronization to Kafka

NineData data replication supports replicating data from PostgreSQL to Kafka, allowing enterprises to achieve efficient data migration and synchronization, ensuring that the data in the Kafka system is always up-to-date.

Background Information

Kafka, as a distributed streaming platform, is widely used for real-time data transmission and processing. PostgreSQL is a popular relational database management system, commonly used for storing structured data. In many application scenarios, synchronizing data from PostgreSQL to Kafka in real-time can improve data processing speed and scalability, providing support for enterprise data analysis, log processing, etc.

Prerequisites

  • The source and target data sources have been added to NineData. For instructions on how to add them, please refer to Create Data Source.

  • The target database type is Kafka 0.10 or above.

  • You must have the following permissions for the source data source:

    Replication Type
    Permissions
    Full ReplicationCONNECT, SELECT
    Incremental ReplicationSUPERUSER
  • For incremental replication, please open the postgresql.conf file and configure the following parameters. If you cannot find the location of the file, you can execute the SHOW config_file; SQL command in the psql client to view it.

    • The wal_level parameter of the source data source must be logical. To confirm the current value, you can directly execute SHOW wal_level in the client.
    • The wal_sender_timeout parameter of the source data source is set to 0. This parameter is used to interrupt replication connections that have been stagnant for more than the specified number of milliseconds. The default value is 60000 milliseconds. Setting it to 0 will disable the timeout mechanism. To confirm the current value, you can directly execute SHOW wal_sender_timeout in the client.
    • The max_replication_slots parameter of the source data source must be greater than 1. This parameter specifies the maximum number of replication slots the server can support. The default value is 10.
    • The max_wal_senders parameter of the source data source must be greater than 1. This parameter specifies the maximum number of concurrent connections. The default value is 10. To confirm the current value, you can directly execute SHOW max_wal_senders in the client.

Usage Limitations

  • Before performing data synchronization, it is necessary to assess the performance of the source and target data sources, and it is recommended to perform data synchronization during off-peak business hours. Otherwise, the full data initialization will occupy a certain amount of read and write resources of the source and target data sources, leading to increased database load.
  • It is necessary to ensure that each table in the synchronization object has a primary key or unique constraint, and the column names have uniqueness, otherwise, the same data may be synchronized repeatedly.

Operation Steps

Commercialization Notice

NineData’s data replication product has been commercialized. You can still use 10 replication tasks for free, with the following considerations:

  • Among the 10 replication tasks, you can include 1 Incremental task, with a specification of Micro.

  • Tasks with a status of Terminated do not count towards the 10-task limit. If you have already created 10 replication tasks and want to create more, you can terminate previous replication tasks and then create new ones.

  • When creating replication tasks, you can only select the Spec you have purchased. Specifications that have not been purchased will be grayed out and cannot be selected. If you need to purchase additional specifications, please contact us through the customer service icon at the bottom right of the page.

  1. Log in to the NineData Console.

  2. Click on Replication > Data Replication in the left navigation bar.

  3. On the Replication page, click on Create Replication in the top right corner.

  4. On the Source & Target tab, configure according to the table below and click Next.

    Parameter
    Description
    NameEnter the name of the data synchronization task. To facilitate subsequent search and management, please use meaningful names as much as possible. Up to 64 characters are supported.
    SourceThe data source where the synchronization object is located, select the data source where the data to be copied is located.
    TargetThe data source that receives the synchronization object, select the target Kafka data source.
    Kafka TopicSelect the target Kafka Topic (topic), the data from the source data source will be written into the specified Topic.
    Delivery PartitionWhen delivering data to a Topic, you can specify which partition of the Topic to deliver the data to.
    • Deliver All to Partition 0: Deliver all data to the default partition 0.
    • Deliver to different partition by the Hash value of [databaseName + tableName]: Distribute data to different partitions, the system will use the hash value of the database name and table name to calculate which partition the target data should be delivered to, ensuring that the same table's data is delivered to the same partition during the hash delivery process.
    TypeSupports Full and Incremental two types of Type.
    • Full: Copy all objects and data from the source data source, i.e., full data replication. The switch on the right is the switch for periodic full replication, more information, please refer to Periodic Full Replication.
    • Incremental: After the full replication is completed, incremental replication is performed based on the logs of the source data source.
    Spec(not selectable only under Schema

    Optional only when the task contains Full or Incremental.

    The specifications of the replication task determine the replication speed: the larger the specification, the higher the replication rate. Hover over the detailsicon to view the replication rate and configuration details for each specification. Each specification displays the available quantity and the total number of specifications. If the available quantity is 0, it will be grayed out and cannot be selected.

    Incremental StartedRequired when Type is only Incremental.
    • From Started: Perform incremental replication based on the current replication task start time.
    • Customized Time: Select the start time of incremental replication, you can choose the time zone according to your business region. If the time point is configured before the current replication task starts, the replication task will fail if there are DDL operations during that time period.
    If target table already exists
    • Pre-Check Error and Stop Task: Stop the task if data is detected in the target table during the pre-inspection phase.
    • Ignore the existing data and append : Ignore the data if it is detected in the target table during the pre-inspection phase, and append other data.
    • Clear the existing data before write: Delete the data if it is detected in the target table during the pre-inspection phase, and rewrite it.
  5. On the Objects tab, configure the following parameters, and then click Next.

    Parameter
    Description
    Replication ObjectsSelect the content to be replicated, you can choose Full Instance to replicate all content of the source library, or you can choose Customized Object, select the content to be replicated in the Source Object list, and click > to add to the right Target Object list.

    If you need to create multiple replication chains with the same replication object, you can create a configuration file and import it when creating a new task. Click on Import Config in the top right corner, then click Download Template to download the configuration file template to your local machine, edit it, and then click Upload to upload the configuration file to achieve batch import. Configuration file description:

    Parameter
    Description
    source_table_nameThe source table name where the object to be synchronized is located.
    destination_table_nameThe target table name where the object to be synchronized is received.
    source_schema_nameThe source Schema name where the object to be synchronized is located.
    destination_schema_nameThe target Schema name where the object to be synchronized is received.
    source_database_nameThe source library name where the object to be synchronized is located.
    target_database_nameThe target library name where the object to be synchronized is received.
    column_listThe list of fields to be synchronized.
    extra_configurationAdditional configuration information can be set here:
    • column_rules: Used to define column mapping and value rules. Field descriptions:
      • column_name: Original column name.
      • destination_column_name: Target column name.
      • column_value: Value to assign, which can be an SQL function or a constant.
    • filter_condition: Used to specify row-level filtering conditions; only rows that meet the criteria will be copied.
    tip
    • An example of extra_configuration is as follows:

      {
      "extra_config": {
      "column_rules": [
      {
      "column_name": "created_time", // Original column name to map.
      "destination_column_name": "migrated_time", // Target column name mapped to "migrated_time".
      "column_value": "current_timestamp()" // Change the column value to the current timestamp.
      }
      ],
      "filter_condition": "id != 0" // Only rows where ID is not 0 will be synchronized.
      }
      }

    • For the overall example content of the configuration file, please refer to the downloaded template.

  6. On the Mapping tab, you can configure each column that needs to be replicated to Kafka individually, by default, all columns of the selected table will be replicated. If there are updates in the source and target data sources during the configuration mapping phase, you can click the Refresh Metadata button in the top right corner of the page to refresh the information of the source and target data sources. After the configuration is completed, click Save and Pre-Check.

  7. On the Pre-check tab, wait for the system to complete the pre-inspection, and after the pre-inspection is passed, click Launch.

    tip
    • If the pre-inspection does not pass, you need to click on the Details in the Actions column on the right side of the target inspection item, troubleshoot the cause of the failure, manually fix it, and then click Check Again to re-execute the pre-inspection until it passes.

    • If the Result is Warning for the inspection item, it can be fixed or ignored according to the specific situation.

  8. On the Launch page, it prompts Launch Successfully, and the synchronization task starts running. At this time, you can perform the following operations:

    • Click View Details to view the execution of each stage of the synchronization task.
    • Click Back to list to return to the Replication task list page.

View Synchronization Results

  1. Log in to NineData Console.

  2. Click on Replication > Data Replication in the left navigation bar.

  3. On the Replication page, click the ID of the target synchronization task to open the Details page, the page description is as follows.

    result_kafka

    No.
    Feature
    Description
    1Synchronization DelayThe data synchronization delay between the source and target data sources, 0 seconds indicates no delay between the two ends, meaning that the data on the Kafka side has currently caught up with the source side.
    2Configure AlertsAfter configuring alerts, the system will notify you through the method you choose when the task fails. For more information, please refer to Operations Monitoring Introduction.
    3More
    • Pause: Pause the task, only tasks with the status Running are selectable.
    • Terminate: End tasks that are not completed or listening (i.e., in incremental synchronization), after terminating the task, it cannot be restarted, please operate with caution.
    • Delete: Delete the task, the task cannot be recovered after deletion, please operate with caution.
    4Full Replication (Displayed in scenarios including full replication)Display the progress and detailed information of full replication.
    • Click on the Monitor on the right side of the page: View various monitoring metrics during the full replication process. During the full replication process, you can also click on the Flow Control Settings on the right side of the monitoring metrics page to limit the rate of writing to the target data source per second. The unit is MB/S.
    • Click on the Log on the right side of the page: View the execution logs of full replication.
    • Click on the refresh icon on the right side of the page: View the latest information.
    5Incremental Replication (Displayed in scenarios including incremental replication)Display various monitoring metrics for incremental replication.
    • Click on the Flow Control Settings on the right side of the page: Limit the rate of writing to the target data source per second. The unit is rows/second.
    • Click on the Log on the right side of the page: View the execution logs of incremental replication.
    • Click on the refresh icon on the right side of the page: View the latest information.
    6Modify ObjectDisplay the modification records of the synchronization object.
    • Click on the Modify Objects on the right side of the page to configure the synchronization object.
    • Click on the refresh icon on the right side of the page: View the latest information.
    7ExpandDisplay detailed information of the current replication task, including Type, Replication Objects, Started, etc.

Appendix 1: Data Format Description

The data migrated from PostgreSQL to Kafka will be stored in JSON format. The system will equally divide the data in PostgreSQL into multiple JSON objects, with each JSON representing a message (Message).

Each JSON object contains the following fields:

FieldField TypeField DescriptionField Example
serverIdSTRINGMessage ownership data source information, format: <connection address:port>."serverId":"47.98.224.21:3307"
idLONGThe Record id of the message. This field is globally incremented and is used as the basis for determining message duplicate consumption."Id":156
esINTRepresented by Unix timestamp, different task stages represent different meanings:
  • Full replication phase: Indicates the start time of the full data replication task.
  • Incremental replication phase: Indicates the time corresponding to each event (EVENT) in the Binlog.
"es":1668650385
tsINTThe time when more data is delivered to Kafka, represented by Unix timestamp."ts":1668651053
isDdlBOOLEANWhether the data is DDL, values:
  • true: Yes
  • false: No
"is_ddl":true
typeSTRINGThe type of data, values:
  • INIT: Indicates full data replication.
  • INSERT: Indicates INSERT operation.
  • DELETE: Indicates DELETE operation.
  • UPDATE: Indicates UPDATE operation.
  • DDL: Indicates DDL operation.
"type":"INIT"
databaseSTRINGThe database to which the data belongs."database":"database_name"
tableSTRINGThe table to which the data belongs. If the DDL statement corresponds to a non-table object, this field takes the value null."table":"table_name"
mysqlTypeJSONReserved field, no need to pay attention."mysqlType": null
sqlTypeJSONThe data types of each field in the source PostgreSQL."sqlType": {"id": "NUMBER","shipping_type":"varchar2(50)"}
pkNamesARRAYThe names of the primary keys (Primary Key) corresponding to the records (Record) in the Binlog. Values:
  • If the Record is of DDL type, the value is null.
  • If the Record is of INIT or DML type, the value is the primary key name of the Record.
"pkNames": ["id", "uid"]
dataARRAY[JSON]The data from PostgreSQL delivered to Kafka, stored in a JSON format array.
  • Full data replication scenario (type = INIT): Stores the full data delivered from PostgreSQL to Kafka.
  • Incremental data replication scenario: Stores the details of the changes made to the data.
    • INSERT: The values of the insert operation in each field.
    • UPDATE: The values of the update operation (after update) in each field.
    • DELETE: The values of the delete operation in each field.
    • DDL: The table structure after the DDL operation on the table.
"data": [{ "name": "jl", "phone": "(737)1234787", "email": "caicai@yahoo.edu", "address": "zhejiang", "country": "china" }]
oldARRAY[JSON]Records the details of the incremental replication from PostgreSQL to Kafka.
  • UPDATE: The values before the update operation in each field.
  • DDL: The table structure before the DDL operation on the table.

For other operations, the value of this field is null.
"old": [{ "name": "someone", "phone": "(737)1234787", "email": "someone@example.com", "address": "somewhere", "country": "china" }]
sqlSTRINGIf the current data is an incremental DDL operation, it records the corresponding SQL statement. For other operations, the value of this field is null."sql":"create table sbtest1(id int primary key,name varchar(20))"

Appendix 2: Pre-Check Item List

Check ItemCheck Content
Source datasource connection checkCheck the status of the gateway of the source datasource, database connectable, and verify the username and password
Target datasource connection checkCheck the status of the gateway of the target datasource, database connectable, and verify the username and password
Source databse privilege checkCheck whether the account privileges of the source database meet the requirements
Target databse privilege checkChecking Kafka account's access permission to Topic
Target database data existence checkCheck if there is existing data in the topic
Check wal_ LevelCheck if the wal_level of the source datasource is set to logical
Check max_wal_sendersCheck whether max_wal_senders meets wal sender requirements
Check max_replication_slotsCheck whether max_replication_slots meets replication slot requirements

Data Replication Introduction