Skip to main content

MySQL to Kafka

NineData data replication supports replicating data from MySQL to Kafka, enabling real-time data flow and distribution.

Background Information

Kafka is a distributed message queue with features such as high throughput, low latency, high reliability, and scalability, making it suitable for real-time data processing and distribution scenarios. Therefore, replicating data from MySQL to Kafka can help build a real-time data stream processing system to meet the needs of real-time data processing, data analysis, data mining, business monitoring, and more. In addition, replicating data from MySQL to Kafka can also decouple data, reducing dependencies and couplings between systems and making them easier to maintain and scale.

After data is replicated from MySQL to Kafka, it is stored in the form of a JSON array, which contains detailed information about the fields included in the JSON array. Please refer to the Appendix for more information.

Prerequisites

  • The source and target data sources have been added to NineData. For instructions on how to add data sources, please refer to Creating a Data Source.

  • The source database is MySQL or a MySQL-like database, such as MySQL, MariaDB, PolarDB MySQL, TDSQL-C, GaussDB MySQL, and so on.

  • The target database is Kafka version 0.10 or above.

  • The source data source must have binlog enabled, and the following binlog-related parameters are set:

    • binlog_format=ROW
    • binlog_row_image=FULL
    tip

    If the source data source is a backup database, to ensure the complete acquisition of binlog logs, the log_slave_updates parameter needs to be enabled.

Restrictions

  • The data replication function is only for the user databases in the data source, and the system databases will not be replicated. For example: information_schema, mysql, performance_schema, sys databases in MySQL type data sources will not be replicated.
  • Before performing data synchronization, user need to evaluate the performance of the source data source and the target data source, and it is recommended to perform data synchronization during off-peak time. Otherwise, the full data initialization will occupy a certain amount of read and write resources of the source data source and the target data source, increasing database load.
  • It is necessary to ensure that each table in the synchronization object has a primary key or unique constraint, and the column name is unique, otherwise the same data may be synchronized repeatedly.

Operation steps

  1. Log in to the NineData console.

  2. Click on > in the left navigation bar.

  3. On the page, click in the upper right corner.

  4. On the tab, configure the parameters according to the table below, and click .

    Parameter
    Description
    Enter a meaningful name for the data synchronization task, which will make it easier to search for and manage later. Supports up to 64 characters.
    Select the MySQL data source where the data to be copied is located as the synchronization object.
    Select the target Kafka data source that will receive the synchronized objects.
    Select the target Kafka Topic, where the data from MySQL will be written to.
    When delivering data to a Topic, you can specify which partition the data should be delivered to.
    • : Deliver all data to the default partition 0.
    • : Hash the data and distribute it to different partitions. The system will use the hash value of the database name and table name to determine which partition the target data will be delivered to, ensuring that data from the same table is delivered to the same partition during the hash delivery process.
    Supports two types of migration: and .
    • : Copy all objects and data from the source data source, i.e. full data replication.
    • : After the full replication is completed, perform incremental replication based on the logs of the source data source. Click the setting icon to uncheck certain operation types according to your needs. Once unchecked, these operations will be ignored during incremental synchronization.
    Note: You can also click to expand , and select the processing strategy when there are tables or data with the same name.
    Only required when is .
    • : Perform incremental replication based on the start time of the current replication task.
    • : Select the time point when incremental replication begins. You can select the time zone according to your business location as needed. If the time point is configured before the start of the current replication task, the replication task will fail if there are any DDL operations in that time period.
    • : Stop the task when data is detected in the target table during the pre-check phase.
    • : Ignore the conflicting data detected in the target table during the pre-check phase and append other data to it.
    • : Delete the conflicting data detected in the target table during the pre-check phase and rewrite it.
  5. On the tab, configure the following parameters, and then click .

    Parameter
    Description
    Select the content to be copied. You can choose to copy all content from the source database, or choose , select the content to be copied in the list, and click > to add it to the list on the right.
    (optional)Click to add a blacklist record. Select the database or object to be added to the blacklist. These contents will not be copied. It is used to exclude certain databases or objects in the full database replication of or .
    • Left drop-down box: Select the name of the database to be added to the blacklist.
    • Right drop-down box: Select the object in the corresponding database. You can click multiple objects for multiple selections. Leave it blank to add the entire database to the blacklist.
    If you want to add multiple databases to the blacklist, you can click the Add button below to add a row.
  6. On the tab, you can configure each column that needs to be copied to Kafka separately. By default, all columns of the selected table will be copied.

  7. On the tab, wait for the system to complete the pre-check. After the pre-check passes, click .

    tip
    • If the pre-check fails, click in the column to the right of the target check item, troubleshoot the cause of the failure, manually fix it, and then click to run the pre-check again until it passes.
    • Check items with a status in can be repaired or ignored based on specific circumstances.
  8. On the page, when prompted with , the synchronous task begins to run. At this point, you can perform the following operations:

    • Click to view the execution status of each stage of the synchronous task.
    • Click to return to the task list page.

View Synchronization Results

  1. Log in to the NineData console.

  2. Click on > in the left-hand navigation menu.

  3. On the page, click on the ID of the target synchronization task to open the page, which is explained as follows:

    result

    Number
    Function
    Explanation
    1Synchronization DelayThe data synchronization delay between the source and target data sources. 0 seconds means there is no delay between the two ends, which means that the data on the Kafka side has caught up with the MySQL side.
    2Configuration AlertAfter configuring the alert, the system will notify you in the way you choose when the task fails. For more information, see Introduction to Operation and Maintenance Monitoring.
    3More
    • : Pause the task. Only tasks in status are optional.
    • : End the unfinished or listening (i.e., incremental synchronization) task. After the task is terminated, the task cannot be restarted, so please operate with caution. If the synchronization object contains triggers, the trigger copying option will pop up. Please select as needed.
    • : Delete the task. Once the task is deleted, it cannot be restored, so please operate with caution.
    4Full Copy (Displayed in the case of full copy)Display the progress and details of the full copy.
    • Click on on the right side of the page to view various monitoring indicators during full copy. During full copy, you can also click on on the right side of the monitoring indicator page to limit the rate of writing to the target data source per second. The unit is MB/s.
    • Click on on the right side of the page to view the execution log of the full copy.
    • Click on the refresh icon on the right side of the page to view the latest information.
    5Incremental Copy (Displayed in the case of incremental copy)Display various monitoring indicators of incremental synchronization.
    • Click on on the right side of the page: limit the rate of writing to the target data source per second. The unit is rows/second.
    • Click on on the right side of the page: view the execution log of incremental replication.
    • Click on the refresh icon on the right side of the page: view the latest information.
    6Modify ObjectDisplay the modification records of the synchronized object.
    • Click on on the right side of the page to configure the synchronized object.
    • Click on the refresh icon on the right side of the page to view the latest information.
    7ExpandDisplay detailed information of the current replication task, including , , , etc.

Appendix: Data Format Specification

Data migrated from MySQL to Kafka will be stored in JSON format, and the system will evenly divide the data in MySQL into multiple JSON objects, with each JSON representing a message.

Each JSON object contains the following fields:

Field NameField TypeField DescriptionField Example
serverIdSTRINGThe data source information to which the message belongs, in the format: <connection_address:port>."serverId":"47.98.224.21:3307"
idLONGThe Record id of the message. This field globally increments and serves as a judgment basis for duplicate message consumption."Id":156
esINTDifferent task stages represent different meanings:
  • Full replication stage: represents the start time of the full data replication task, represented as a Unix timestamp.
  • Incremental replication stage: represents the time corresponding to each event (EVENT) in the Binlog.
"es":1668650385
tsINTThe time when more data was delivered to Kafka, represented as a Unix timestamp."ts":1668651053
isDdlBOOLEANWhether the data is DDL, with values:
  • true: Yes
  • false: No
"is_ddl":true
typeSTRINGThe type of data, with values:
  • INIT: Represents full data replication.
  • INSERT: Represents INSERT operation.
  • DELETE: Represents DELETE operation.
  • UPDATE: Represents UPDATE operation.
  • DDL: Represents DDL operation.
"type":"INIT"
databaseSTRINGThe database to which the data belongs."database":"database_name"
tableSTRINGThe table to which the data belongs. If the object corresponding to the DDL statement is not a table, the field value is null."table":"table_name"
mysqlTypeJSONThe data type of the data in MySQL, represented as a JSON array."mysqlType": {"id": "bigint(20)", "shipping_type": "varchar(50)" }
sqlTypeJSONReserved field, no need to pay attention."sqlType":null
pkNamesARRAYThe primary key names corresponding to the record (Record) in the Binlog. Values:
  • If the record is of DDL type, the value is null.
  • If the record is of INIT or DML type, the value is the primary key name of that record.
"pkNames": ["id", "uid"]
dataARRAY[JSON]The data delivered from MySQL to Kafka, stored in a JSON format array.
  • Full data replication scenario (type = INIT): Stores the full data delivered from MySQL to Kafka.
  • Incremental data replication scenario: Stores the details of the changes made to the data in the Binlog.
    • INSERT: The values of the insert operation in each field.
    • UPDATE: The values of the update operation (after the update) in each field.
    • DELETE: The values of the delete operation in each field.
    • DDL: The table structure after the table DDL operation.
`"old": [{ "name": "someone", "
oldARRAY[JSON]Records the incremental replication details from MySQL to Kafka.
  • UPDATE: The values of each field before the update operation.
  • DDL: The table structure before the DDL operation on the table.

For other operations, the value of this field is null.
"old": [{ "name": "someone", "phone": "(737)1234787", "email": "someone@example.com", "address": "somewhere", "country": "china" }]
sqlSTRINGIf the current data is an incremental DDL operation, records the SQL statement corresponding to the operation. For other operations, the value of this field is null."sql":"create table sbtest1(id int primary key,name varchar(20))"

Appendix 2: Checklist of Pre-Check Items

Check ItemCheck Content
Source Data Source Connection CheckCheck the status of the source data source gateway, instance accessibility, and accuracy of username and password
Target Data Source Connection CheckCheck the status of the target data source gateway, instance accessibility, and accuracy of username and password
Source Database Permission CheckCheck if the account permissions in the source database meet the requirements
Check if Source Database log_slave_updates is SupportedCheck if log_slave_updates is set to ON when the source database is a slave
Source Data Source and Target Data Source Version CheckCheck if the versions of the source database and target database are compatible
Check if Source Database is Enabled with BinlogCheck if the source database is enabled with Binlog
Check if Source Database Binlog Format is SupportedCheck if the binlog format of the source database is 'ROW'
Check if Source Database binlog_row_image is SupportedCheck if the binlog_row_image of the source database is 'FULL'
Target Database Permission CheckCheck if the Kafka account has access permissions for the Topic
Target Database Data Existence CheckCheck if data exists in the Topic

Introduction to Data Replication