Kafka Replication to MySQL
NineData's data replication supports copying data from Kafka to MySQL, enabling real-time data synchronization and persistent storage.
Background Information
NineData provides a robust and reliable data replication solution that simplifies the process of migrating data between different data stores, enhancing the efficiency of data management for developers. Leveraging the high throughput and real-time capabilities of Kafka, you can facilitate distributed data processing scenarios. Simultaneously, you can migrate data from Kafka to a MySQL database for secure storage and real-time analysis and querying.
Prerequisites
- Source and target data sources have been added to NineData. For instructions on adding data sources, refer to Creating a Data Source.
- Source data source is Kafka version 0.10 or above.
- Target data source is MySQL or a MySQL-compatible database such as MySQL, MariaDB, PolarDB MySQL, TDSQL-C, GaussDB MySQL, etc.
- Corresponding databases, tables, and columns have been created on the target MySQL based on the field names in the Kafka JSON objects.
Steps
NineData’s data replication product has been commercialized. You can still use 10 replication tasks for free, with the following considerations:
Among the 10 replication tasks, you can include 1 task, with a specification of Micro.
Tasks with a status of do not count towards the 10-task limit. If you have already created 10 replication tasks and want to create more, you can terminate previous replication tasks and then create new ones.
When creating replication tasks, you can only select the you have purchased. Specifications that have not been purchased will be grayed out and cannot be selected. If you need to purchase additional specifications, please contact us through the customer service icon at the bottom right of the page.
Log in to the NineData Console.
Click on > in the left navigation bar.
On the page, click .
On the tab, configure as per the table below and click .
Parameter Description Enter a meaningful name for the data synchronization task. It is recommended to use a name that is easy to identify and manage. Maximum 64 characters. Data source where the synchronization object is located. Choose the Kafka data source where the data to be copied is located. Data source that receives the synchronization object. Choose the target MySQL data source. Select the Kafka topic to be copied. Message Format Choose the format in which data will be copied to MySQL. Currently, only Canal-JSON is supported. - : Find the earliest message in the Kafka topic and copy it to MySQL in order.
- : Ignore existing data in the Kafka topic and only copy new messages generated after the task starts.
- : Customize the message position in the Kafka topic, i.e., the Offset number. The system will start copying messages from this Offset number in order.
- : Stop the task if data is detected in the target table during the pre-check stage.
- : Ignore the data detected in the target table during the pre-check stage and append other data.
- : Delete the data detected in the target table during the pre-check stage and rewrite it.
On the tab, configure the parameters below and click .
Parameter Description Choose the storage location after Kafka data is copied to MySQL. You can choose to include all databases in MySQL, or to select the location where Kafka data should be stored. Select the desired location(s) from the list, and click > to add them to the list on the right. On the tab, you can individually configure the mapping between key-value pairs in Kafka messages and columns in each MySQL table. As long as the Kafka message meets the configured conditions, it will be saved in the corresponding location in MySQL. For more information, refer to Appendix.
tipWhen Message Format is Canal-JSON, you need to configure the mapping of the $.database and $.table fields in each MySQL database and table. For example, if you configure $.database as
sample_employees_210
and $.table asdepartments
, all messages in Kafka that meet this condition will be copied to the corresponding columns in the target MySQL database table.On the tab, wait for the system to complete the pre-check. After the pre-check passes, click .
tipIf the pre-check fails, click to the right of the target check item, troubleshoot the failure cause manually, and click to re-run the pre-check until it passes.
For items with as , you can fix or ignore them based on the specific situation.
On the page, when prompted with , the synchronization task starts running. At this point, you can perform the following operations:
- Click to view the execution status of each stage of the synchronization task.
- Click to return to the task list page.
Viewing Sync Results
Log in to the NineData Console.
Click on > in the left navigation bar.
On the page, click on the ID of the target sync task to open the page. The page is explained below.
Number Feature Description 1 Sync Delay The data synchronization delay between the source and target data sources. 0 seconds means there is no delay between the two ends, indicating that Kafka currently has no consumed data. 2 Configure Alerts After configuring alerts, the system will notify you through your chosen method in case of task failure. For more information, see Operation and Maintenance Monitoring Introduction. 3 More - : Pause the task. Only tasks with a status of can be selected.
- : End a task that is unfinished or listening (i.e., in incremental synchronization). After terminating the task, it cannot be restarted. Proceed with caution.
- : Delete the task. Once a task is deleted, it cannot be recovered. Proceed with caution.
4 Incremental Copy Display various monitoring indicators for incremental copy. - Click on on the right side of the page to view the execution log of incremental copy.
- Click on the icon on the right side of the page to view the latest information.
5 Modify Object Display modification records of the synchronization object. - Click on on the right side of the page to configure the synchronization object .
- Click on the icon on the right side of the page to view the latest information.
6 More Display detailed information about the current copy task, including the Kafka Topic and Message Format currently being synchronized, , etc.
Appendix: Definition of JSON_PATH
Kafka to MySQL replication is achieved by extracting data from Kafka JSON objects and storing it in the corresponding databases and tables in MySQL. JSON_PATH is a path expression used to locate specific elements in a JSON object. This section introduces the definition of JSON_PATH.
JSON_PATH consists of two main parts: Scope and pathExpression.
- Scope: Represented by the dollar sign ($), it signifies the root of the JSON.
- pathExpression: Used to match the path of specific elements in the JSON object. It consists of one or more pathLegs, which can be member, arrayLocation, or doubleAsterisk. Multiple pathLegs represent multiple levels.
- member: Matches the key in the JSON object. The format is
.<Key Name>
or.*
, where the former looks for the specified key name in the JSON object, and the latter represents all keys in the JSON object. - arrayLocation: Matches array elements in the JSON object. The format is
[<non-negative integer>]
or[*]
, where the former locates array elements based on the specified non-negative integer, and the latter represents all array elements in the JSON object. - doubleAsterisk: Matches keys or array positions at any level. The format is
.**
.
- member: Matches the key in the JSON object. The format is
Examples:
$.data
: Matches the JSON object with the name data.$.data.order_id
: Matches the key named order_id under the data JSON object.$[1]
: Matches the array element with index 1 in the JSON object.$[*]
: Matches all array elements in the JSON object.$.**
: Matches any key or array position at any level in the JSON object.