OceanBase MySQL to Kafka
NineData data replication supports replicating data from OceanBase MySQL to Kafka, enabling real-time data flow and distribution.
Prerequisites
The source and target data sources have been added to NineData. For instructions on how to add data sources, please refer to Creating a Data Source.
The source database is OceanBase MySQL.
The target database is Kafka version 0.10 or above.
The source data source must have binlog enabled, and the following binlog-related parameters are set:
binlog_format=ROWbinlog_row_image=FULL
tipIf the source data source is a backup database, to ensure the complete acquisition of binlog logs, the
log_slave_updatesparameter needs to be enabled.
Restrictions
- The data replication function is only for the user databases in the data source, and the system databases will not be replicated. For example:
information_schema,mysql,performance_schema,sysdatabases in MySQL type data sources will not be replicated. - Before performing data synchronization, user need to evaluate the performance of the source data source and the target data source, and it is recommended to perform data synchronization during off-peak time. Otherwise, the full data initialization will occupy a certain amount of read and write resources of the source data source and the target data source, increasing database load.
- It is necessary to ensure that each table in the synchronization object has a primary key or unique constraint, and the column name is unique, otherwise the same data may be synchronized repeatedly.
Operation steps
NineData’s data replication product has been commercialized. You can still use 10 replication tasks for free, with the following considerations:
Among the 10 replication tasks, you can include 1 Incremental task, with a specification of Micro.
Tasks with a status of Terminated do not count towards the 10-task limit. If you have already created 10 replication tasks and want to create more, you can terminate previous replication tasks and then create new ones.
When creating replication tasks, you can only select the Spec you have purchased. Specifications that have not been purchased will be grayed out and cannot be selected. If you need to purchase additional specifications, please contact us through the customer service icon at the bottom right of the page.
Log in to the NineData Console.
Click on Replication > Data Replication in the left navigation bar.
On the Replication page, click on Create Replication in the top right corner.
On the Source & Target tab, configure according to the table below and click Next.
Parameter Description Name Enter the name of the data synchronization task. To facilitate subsequent search and management, please use meaningful names as much as possible. Up to 64 characters are supported. Source The data source where the synchronization object is located, select the data source where the data to be copied is located. Target The data source that receives the synchronization object, select the target Kafka data source. Kafka Topic Select the target Kafka Topic (topic), the data from the source data source will be written into the specified Topic. Delivery Partition When delivering data to a Topic, you can specify which partition of the Topic to deliver the data to. - Deliver All to Partition 0: Deliver all data to the default partition 0.
- Deliver to different partition by the Hash value of [databaseName + tableName]: Distribute data to different partitions, the system will use the hash value of the database name and table name to calculate which partition the target data should be delivered to, ensuring that the same table's data is delivered to the same partition during the hash delivery process.
Type Supports Full and Incremental two types of Type. - Full: Copy all objects and data from the source data source, i.e., full data replication. The switch on the right is the switch for periodic full replication, more information, please refer to Periodic Full Replication.
- Incremental: After the full replication is completed, incremental replication is performed based on the logs of the source data source.
Spec(not selectable only under Schema) Optional only when the task contains Full or Incremental.
The specifications of the replication task determine the replication speed: the larger the specification, the higher the replication rate. Hover over the
icon to view the replication rate and configuration details for each specification. Each specification displays the available quantity and the total number of specifications. If the available quantity is 0, it will be grayed out and cannot be selected.
Incremental Started Required when Type is only Incremental. - From Started: Perform incremental replication based on the current replication task start time.
- Customized Time: Select the start time of incremental replication, you can choose the time zone according to your business region. If the time point is configured before the current replication task starts, the replication task will fail if there are DDL operations during that time period.
If target table already exists - Pre-Check Error and Stop Task: Stop the task if data is detected in the target table during the pre-inspection phase.
- Ignore the existing data and append : Ignore the data if it is detected in the target table during the pre-inspection phase, and append other data.
- Clear the existing data before write: Delete the data if it is detected in the target table during the pre-inspection phase, and rewrite it.
On the Objects tab, configure the following parameters, and then click Next.
Parameter Description Replication Objects Select the content to be replicated, you can choose Full Instance to replicate all content of the source library, or you can choose Customized Object, select the content to be replicated in the Source Object list, and click > to add to the right Target Object list. If you need to create multiple replication chains with the same replication object, you can create a configuration file and import it when creating a new task. Click on Import Config in the top right corner, then click Download Template to download the configuration file template to your local machine, edit it, and then click Upload to upload the configuration file to achieve batch import. Configuration file description:
Parameter Description source_table_nameThe source table name where the object to be synchronized is located. destination_table_nameThe target table name where the object to be synchronized is received. source_schema_nameThe source Schema name where the object to be synchronized is located. destination_schema_nameThe target Schema name where the object to be synchronized is received. source_database_nameThe source library name where the object to be synchronized is located. target_database_nameThe target library name where the object to be synchronized is received. column_listThe list of fields to be synchronized. extra_configurationAdditional configuration information can be set here: column_rules: Used to define column mapping and value rules. Field descriptions:column_name: Original column name.destination_column_name: Target column name.column_value: Value to assign, which can be an SQL function or a constant.
filter_condition: Used to specify row-level filtering conditions; only rows that meet the criteria will be copied.
tipAn example of
extra_configurationis as follows:{
"extra_config": {
"column_rules": [
{
"column_name": "created_time", // Original column name to map.
"destination_column_name": "migrated_time", // Target column name mapped to "migrated_time".
"column_value": "current_timestamp()" // Change the column value to the current timestamp.
}
],
"filter_condition": "id != 0" // Only rows where ID is not 0 will be synchronized.
}
}For the overall example content of the configuration file, please refer to the downloaded template.
On the Mapping tab, you can configure each column that needs to be replicated to Kafka individually, by default, all columns of the selected table will be replicated. If there are updates in the source and target data sources during the configuration mapping phase, you can click the Refresh Metadata button in the top right corner of the page to refresh the information of the source and target data sources. After the configuration is completed, click Save and Pre-Check.
On the Pre-check tab, wait for the system to complete the pre-inspection, and after the pre-inspection is passed, click Launch.
tipIf the pre-inspection does not pass, you need to click on the Details in the Actions column on the right side of the target inspection item, troubleshoot the cause of the failure, manually fix it, and then click Check Again to re-execute the pre-inspection until it passes.
If the Result is Warning for the inspection item, it can be fixed or ignored according to the specific situation.
On the Launch page, it prompts Launch Successfully, and the synchronization task starts running. At this time, you can perform the following operations:
- Click View Details to view the execution of each stage of the synchronization task.
- Click Back to list to return to the Replication task list page.
View Synchronization Results
mport Content1 from '../../ref/_ref-result-kafka.md';
Appendix: Data Format Specification
Data migrated from MySQL to Kafka will be stored in JSON format, and the system will evenly divide the data in MySQL into multiple JSON objects, with each JSON representing a message.
- During the full copy phase, the number of MySQL data rows stored in a single message is determined by the message.max.bytesmessage.max.bytes is the maximum message size allowed in the Kafka cluster, with a default value of `1000000` bytes (1 MB). You can adjust this value by modifying the `message.max.bytes` parameter in the Kafka configuration file, allowing each message to store more MySQL data rows. Please note that setting this value too large may cause a decrease in Kafka cluster performance, as Kafka needs to allocate memory space for each message. parameter.
- During the incremental copy phase, a single message stores one row of MySQL data.
Each JSON object contains the following fields:
| Field Name | Field Type | Field Description | Field Example |
|---|---|---|---|
| serverId | STRING | The data source information to which the message belongs, in the format: <connection_address:port>. | "serverId":"47.98.224.21:3307" |
| id | LONG | The Record id of the message. This field globally increments and serves as a judgment basis for duplicate message consumption. | "Id":156 |
| es | INT | Different task stages represent different meanings:
| "es":1668650385 |
| ts | INT | The time when more data was delivered to Kafka, represented as a Unix timestamp. | "ts":1668651053 |
| isDdl | BOOLEAN | Whether the data is DDL, with values:
| "is_ddl":true |
| type | STRING | The type of data, with values:
| "type":"INIT" |
| database | STRING | The database to which the data belongs. | "database":"database_name" |
| table | STRING | The table to which the data belongs. If the object corresponding to the DDL statement is not a table, the field value is null. | "table":"table_name" |
| mysqlType | JSON | The data type of the data in MySQL, represented as a JSON array. | "mysqlType": {"id": "bigint(20)", "shipping_type": "varchar(50)" } |
| sqlType | JSON | Reserved field, no need to pay attention. | "sqlType":null |
| pkNames | ARRAY | The primary key names corresponding to the record (Record) in the Binlog. Values:
| "pkNames": ["id", "uid"] |
| data | ARRAY[JSON] | The data delivered from MySQL to Kafka, stored in a JSON format array.
| `"old": [{ "name": "someone", " |
| old | ARRAY[JSON] | Records the incremental replication details from MySQL to Kafka.
For other operations, the value of this field is null. | "old": [{ "name": "someone", "phone": "(737)1234787", "email": "someone@example.com", "address": "somewhere", "country": "china" }] |
| sql | STRING | If the current data is an incremental DDL operation, records the SQL statement corresponding to the operation. For other operations, the value of this field is null. | "sql":"create table sbtest1(id int primary key,name varchar(20))" |
Appendix 2: Checklist of Pre-Check Items
| Check Item | Check Content |
|---|---|
| Source Data Source Connection Check | Check the status of the source data source gateway, instance accessibility, and accuracy of username and password |
| Target Data Source Connection Check | Check the status of the target data source gateway, instance accessibility, and accuracy of username and password |
| Source Database Permission Check | Check if the account permissions in the source database meet the requirements |
| Check if Source Database log_slave_updates is Supported | Check if log_slave_updates is set to ON when the source database is a slave |
| Source Data Source and Target Data Source Version Check | Check if the versions of the source database and target database are compatible |
| Check if Source Database is Enabled with Binlog | Check if the source database is enabled with Binlog |
| Check if Source Database Binlog Format is Supported | Check if the binlog format of the source database is 'ROW' |
| Check if Source Database binlog_row_image is Supported | Check if the binlog_row_image of the source database is 'FULL' |
| Target Database Permission Check | Check if the Kafka account has access permissions for the Topic |
| Target Database Data Existence Check | Check if data exists in the Topic |