Skip to main content

OceanBase MySQL to Kafka

NineData data replication supports replicating data from OceanBase MySQL to Kafka, enabling real-time data flow and distribution.

Prerequisites

  • The source and target data sources have been added to NineData. For instructions on how to add data sources, please refer to Creating a Data Source.

  • The source database is OceanBase MySQL.

  • The target database is Kafka version 0.10 or above.

  • The source data source must have binlog enabled, and the following binlog-related parameters are set:

    • binlog_format=ROW
    • binlog_row_image=FULL
    tip

    If the source data source is a backup database, to ensure the complete acquisition of binlog logs, the log_slave_updates parameter needs to be enabled.

Restrictions

  • The data replication function is only for the user databases in the data source, and the system databases will not be replicated. For example: information_schema, mysql, performance_schema, sys databases in MySQL type data sources will not be replicated.
  • Before performing data synchronization, user need to evaluate the performance of the source data source and the target data source, and it is recommended to perform data synchronization during off-peak time. Otherwise, the full data initialization will occupy a certain amount of read and write resources of the source data source and the target data source, increasing database load.
  • It is necessary to ensure that each table in the synchronization object has a primary key or unique constraint, and the column name is unique, otherwise the same data may be synchronized repeatedly.

Operation steps

Commercialization Notice

NineData’s data replication product has been commercialized. You can still use 10 replication tasks for free, with the following considerations:

  • Among the 10 replication tasks, you can include 1 "Incremental" task, with a specification of Micro.

  • Tasks with a status of Terminated do not count towards the 10-task limit. If you have already created 10 replication tasks and want to create more, you can terminate previous replication tasks and then create new ones.

  • When creating replication tasks, you can only select the Spec you have purchased. Specifications that have not been purchased will be grayed out and cannot be selected. If you need to purchase additional specifications, please contact us through the customer service icon at the bottom right of the page.

  1. Log in to the NineData Console.

  2. In the left navigation pane, click Replication > Data Replication.

  3. On the Replication page, click Create Replication in the upper-right corner.

  4. On the Source & Target tab, configure the parameters in the following table, and click Next.

    Parameter
    Description
    NameEnter a name for the data synchronization task. To make the task easier to find and manage later, use a meaningful name. Up to 64 characters are supported.
    SourceThe data source that contains the objects to synchronize.
    TargetThe data source that receives the synchronized objects.
    Kafka TopicSelect the target Kafka Topic. Data from the source data source will be written to the specified Topic.
    Delivery PartitionWhen delivering data to a Topic, you can specify the partition to which the data is delivered.
    • Deliver All to Partition 0: Deliver all data to the default partition 0.
    • Deliver to different partition by the Hash value of [databaseName + tableName]: Hash data across different partitions. The system uses the hash value of the database name and table name to calculate the target partition, ensuring that data from the same table is delivered to the same partition during hash delivery.
    Target Object NameSelect the case conversion rule for object names after they are migrated from the source to the target.
    • Convert all to Lowercase: Regardless of the naming rule on the source, all target names are lowercase.
    • Consistent with Source: Keep the naming rule of the source.
    • Convert all to Uppercase: Regardless of the naming rule on the source, all target names are uppercase.
    TypeSelect the replication type.
    • Full: Synchronize all objects and data from the source data source, namely full data replication.
    • Incremental: After full synchronization is complete, perform incremental synchronization based on the logs of the source data source.
    Incremental StartedRequired only when Type is Incremental.
    • From Started: Use the current replication task start time as the baseline for incremental replication.
    • Customized Time: Select the point in time from which incremental replication starts. You can select a time zone based on the region of your business. If the configured time point is earlier than the current replication task start time and DDL operations occurred during that period, the replication task will fail.
    SpecThe specification of the replication task. A larger specification provides a higher replication rate. Hover over the details icon to view the rate and configuration information of each specification. If you configure the data replication task before purchasing resources, you can select the required specification here. If you purchase resources before configuring the task, the system selects the specification chosen during resource purchase, and you cannot change it in the task configuration.
    Target Table Exists Data (Required when Full is selected)
    • Pre-Check Error and Stop Task: Stop the task when data is detected in the target table during the precheck stage.
    • Ignore existing target data and append to it.: When data is detected in the target table during the precheck stage, ignore that data and append other data.
    • Clear target existing data before write: When data is detected in the target table during the precheck stage, delete that data and write it again.
    Incremental data conflict handling strategy for target table (Required when Incremental is selected)
    • Runtime error: During incremental replication, report an error when target data already exists and wait for manual intervention.
    • Do not update target data: During incremental replication, do not write data when target data already exists, and continue subsequent tasks.
    • Update target data: During incremental replication, overwrite the target data when target data already exists.
  5. On the Objects tab, configure the following parameters, and click Next.

    Parameter
    Description
    Replication ObjectsSelect the content to replicate. You can select All Objects to replicate all content in the source database, or select Customized Object, select the content to replicate in the Source Object list, and click > to add it to the Target Object list on the right.

    If you need to create multiple replication links with the same replication objects, you can create a configuration file and import it when creating a task. Click Import Config in the upper-right corner, click Download Template to download the configuration file template, edit the file, and then click Upload to upload it for batch import. Configuration file description:

    Parameter
    Description
    source_table_nameThe source table name of the object to synchronize.
    destination_table_nameThe target table name that receives the synchronized object.
    source_schema_nameThe source schema name of the object to synchronize.
    destination_schema_nameThe target schema name that receives the synchronized object.
    source_database_nameThe source database name of the object to synchronize.
    target_database_nameThe target database name that receives the synchronized object.
    column_listThe list of columns to synchronize.
    extra_configurationAdditional configuration information. You can configure the following information here:
    • column_rules: Defines column mappings and value rules. Field descriptions:
      • column_name: Original column name.
      • destination_column_name: Specifies the target column name.
      • column_value: Specifies the column value, which can be an SQL function or a constant value.
    • filter_condition: Specifies row-level data filtering conditions. Only rows that meet the conditions are replicated.
    tip
    • The following is an example of extra_configuration:

      {
      "extra_config":{
      "column_rules":[
      {
      "column_name": "created_time", // Specifies the original column name to be mapped.
      "destination_column_name": "migrated_time", // Maps the target column name to "migrated_time".
      "column_value": "current_timestamp()" // Changes the column value to the current timestamp.
      }
      ],
      "filter_condition": "id != 0" // Only rows whose ID is not 0 are synchronized.
      }
      }
    • For a complete example of the configuration file, see the downloaded template.

  6. On the "Mapping" tab, you can separately configure each column to replicate to Kafka. By default, all columns of the selected table are replicated. If the source and target data sources are updated during mapping configuration, click Refresh Metadata in the upper-right corner of the page to refresh the source and target data source information. After completing the configuration, click Save and Pre-Check.

  7. On the Pre-check tab, wait for the system to complete the precheck. After the precheck is passed, click Launch.

    • You can select Enable data consistency comparison. After the synchronization task is complete, a data consistency comparison task based on the source data source automatically starts to ensure data consistency between both ends. Based on the selected Type, Enable data consistency comparison starts at the following times:

      • Full: Starts after full replication is complete.
      • Full+Incremental, Incremental: Starts when incremental data is consistent with the source data source for the first time and Delay is 0 seconds. You can click View Details and view the synchronization delay on the Details page. sync_delay
    • If the precheck fails, click Details in the Actions column on the right of the failed check item, troubleshoot the cause, manually fix the issue, and then click Check Again to run the precheck again until it passes.

    • Check items whose Result is Warning can be fixed or ignored depending on the situation.

  8. On the Launch page, the Launch Successfully prompt appears, indicating that the synchronization task has started. You can then perform the following operations:

    • Click View Details to view the execution status of each stage of the synchronization task.

    • Click Back to list to return to the Replication task list page.

View Synchronization Results

  1. Log in to the NineData Console.

  2. Click on Replication > Data Replication in the left navigation bar.

  3. On the Replication page, click the ID of the target synchronization task to open the Details page, the page description is as follows.

    result_kafka

    No.
    Feature
    Description
    1Synchronization DelayThe data synchronization delay between the source and target data sources, 0 seconds indicates no delay between the two ends, meaning that the data on the Kafka side has currently caught up with the source side.
    2Configure AlertsAfter configuring alerts, the system will notify you through the method you choose when the task fails. For more information, please refer to Operations Monitoring Introduction.
    3More
    • Pause: Pause the task, only tasks with the status Running are selectable.
    • Terminate: End tasks that are not completed or listening (i.e., in incremental synchronization), after terminating the task, it cannot be restarted, please operate with caution.
    • Delete: Delete the task, the task cannot be recovered after deletion, please operate with caution.
    4Full Replication (Displayed in scenarios including full replication)Display the progress and detailed information of full replication.
    • Click on the Monitor on the right side of the page: View various monitoring metrics during the full replication process. During the full replication process, you can also click on the Flow Control Settings on the right side of the monitoring metrics page to limit the rate of writing to the target data source per second. The unit is MB/S.
    • Click on the Log on the right side of the page: View the execution logs of full replication.
    • Click on the refresh icon on the right side of the page: View the latest information.
    5Incremental Replication (Displayed in scenarios including incremental replication)Display various monitoring metrics for incremental replication.
    • Click on the Flow Control Settings on the right side of the page: Limit the rate of writing to the target data source per second. The unit is rows/second.
    • Click on the Log on the right side of the page: View the execution logs of incremental replication.
    • Click on the refresh icon on the right side of the page: View the latest information.
    6Modify ObjectDisplay the modification records of the synchronization object.
    • Click on the Modify Objects on the right side of the page to configure the synchronization object.
    • Click on the refresh icon on the right side of the page: View the latest information.
    7ExpandDisplay detailed information of the current replication task, including Type, Replication Objects, Started, etc.

Appendix: Data Format Specification

Data migrated from MySQL to Kafka will be stored in JSON format, and the system will evenly divide the data in MySQL into multiple JSON objects, with each JSON representing a message.

Each JSON object contains the following fields:

Field NameField TypeField DescriptionField Example
serverIdSTRINGThe data source information to which the message belongs, in the format: <connection_address:port>."serverId":"47.98.224.21:3307"
idLONGThe Record id of the message. This field globally increments and serves as a judgment basis for duplicate message consumption."Id":156
esINTDifferent task stages represent different meanings:
  • Full replication stage: represents the start time of the full data replication task, represented as a Unix timestamp.
  • Incremental replication stage: represents the time corresponding to each event (EVENT) in the Binlog.
"es":1668650385
tsINTThe time when more data was delivered to Kafka, represented as a Unix timestamp."ts":1668651053
isDdlBOOLEANWhether the data is DDL, with values:
  • true: Yes
  • false: No
"is_ddl":true
typeSTRINGThe type of data, with values:
  • INIT: Represents full data replication.
  • INSERT: Represents INSERT operation.
  • DELETE: Represents DELETE operation.
  • UPDATE: Represents UPDATE operation.
  • DDL: Represents DDL operation.
"type":"INIT"
databaseSTRINGThe database to which the data belongs."database":"database_name"
tableSTRINGThe table to which the data belongs. If the object corresponding to the DDL statement is not a table, the field value is null."table":"table_name"
mysqlTypeJSONThe data type of the data in MySQL, represented as a JSON array."mysqlType": {"id": "bigint(20)", "shipping_type": "varchar(50)" }
sqlTypeJSONReserved field, no need to pay attention."sqlType":null
pkNamesARRAYThe primary key names corresponding to the record (Record) in the Binlog. Values:
  • If the record is of DDL type, the value is null.
  • If the record is of INIT or DML type, the value is the primary key name of that record.
"pkNames": ["id", "uid"]
dataARRAY[JSON]The data delivered from MySQL to Kafka, stored in a JSON format array.
  • Full data replication scenario (type = INIT): Stores the full data delivered from MySQL to Kafka.
  • Incremental data replication scenario: Stores the details of the changes made to the data in the Binlog.
    • INSERT: The values of the insert operation in each field.
    • UPDATE: The values of the update operation (after the update) in each field.
    • DELETE: The values of the delete operation in each field.
    • DDL: The table structure after the table DDL operation.
`"old": [{ "name": "someone", "
oldARRAY[JSON]Records the incremental replication details from MySQL to Kafka.
  • UPDATE: The values of each field before the update operation.
  • DDL: The table structure before the DDL operation on the table.

For other operations, the value of this field is null.
"old": [{ "name": "someone", "phone": "(737)1234787", "email": "someone@example.com", "address": "somewhere", "country": "china" }]
sqlSTRINGIf the current data is an incremental DDL operation, records the SQL statement corresponding to the operation. For other operations, the value of this field is null."sql":"create table sbtest1(id int primary key,name varchar(20))"

Appendix 2: Checklist of Pre-Check Items

Check ItemCheck Content
Source Data Source Connection CheckCheck the status of the source data source gateway, instance accessibility, and accuracy of username and password
Target Data Source Connection CheckCheck the status of the target data source gateway, instance accessibility, and accuracy of username and password
Source Database Permission CheckCheck if the account permissions in the source database meet the requirements
Check if Source Database log_slave_updates is SupportedCheck if log_slave_updates is set to ON when the source database is a slave
Source Data Source and Target Data Source Version CheckCheck if the versions of the source database and target database are compatible
Check if Source Database is Enabled with BinlogCheck if the source database is enabled with Binlog
Check if Source Database Binlog Format is SupportedCheck if the binlog format of the source database is 'ROW'
Check if Source Database binlog_row_image is SupportedCheck if the binlog_row_image of the source database is 'FULL'
Target Database Permission CheckCheck if the Kafka account has access permissions for the Topic
Target Database Data Existence CheckCheck if data exists in the Topic

Introduction to Data Replication