Oracle Migration Synchronization to Kafka
NineData data replication supports replicating data from Oracle to Kafka, achieving real-time data flow and distribution.
Background Information
Kafka is a distributed message queue with characteristics such as high throughput, low latency, high reliability, and scalability, making it suitable for real-time data processing and distribution scenarios. Replicating data from Oracle to Kafka can build a real-time data stream processing system to meet the needs of real-time data processing, data analysis, data mining, and business monitoring. In addition, replicating data from Oracle to Kafka can also achieve data decoupling, thereby reducing the dependence and coupling between systems, making the system easier to maintain and expand.
The data replicated from Oracle to Kafka will be stored in the form of a JSON array. For details of the fields contained in the JSON array, please refer to Appendix.
Prerequisites
The source and target data sources have been added to NineData. For how to add them, please refer to Create Data Source.
The source database type is Oracle, with the following version and architecture specifications:
Supported versions: 23c, 21c, 19c, 18c, 12c, 11g
Supported architectures: Non-CDB, CDB
The target database type is Kafka version 0.10 and above.
Please ensure the following configurations have been made in the Oracle data source:
The Oracle log mode has been set to
ARCHIVELOG
mode (default isNOARCHIVELOG
).To verify the current log mode: SELECT log_mode FROM v$database;
If the log mode is NOARCHIVELOG, perform the following steps:
1. Shut down the database: SHUTDOWN IMMEDIATE;
2. Start and mount the database: STARTUP MOUNT;
3. Execute the archive command: ALTER DATABASE ARCHIVELOG;
4. Start the database: ALTER DATABASE OPEN;Supplemental Log has been enabled, i.e., additional logs (not enabled by default).
To verify if the supplemental log is enabled: SELECT supplemental_log_data_all allc FROM v$database;
If it returns NO, execute the command to enable the supplemental log:
- ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
//Enable supplemental logs at the database level, which will affect all tables and all columns, consuming more resources.
- ALTER TABLE <database_name>.<table_name> ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
- ALTER DATABASE ADD SUPPLEMENTAL LOG DATA;
//Enable supplemental logs on a specific table, which will affect all columns in that table.
Usage Limitations
- Before performing data synchronization, it is necessary to assess the performance of the source and target data sources, and it is recommended to perform data synchronization during off-peak business hours. Otherwise, the full data initialization will occupy a certain amount of read and write resources of the source and target data sources, leading to increased database load.
- It is necessary to ensure that each table in the synchronization object has a primary key or unique constraint, and the column names are unique, otherwise, the same data may be synchronized repeatedly.
Operation Steps
NineData’s data replication product has been commercialized. You can still use 10 replication tasks for free, with the following considerations:
Among the 10 replication tasks, you can include 1 task, with a specification of Micro.
Tasks with a status of do not count towards the 10-task limit. If you have already created 10 replication tasks and want to create more, you can terminate previous replication tasks and then create new ones.
When creating replication tasks, you can only select the you have purchased. Specifications that have not been purchased will be grayed out and cannot be selected. If you need to purchase additional specifications, please contact us through the customer service icon at the bottom right of the page.
Log in to the NineData Console.
Click on > in the left navigation bar.
On the page, click on in the top right corner.
On the tab, configure according to the table below and click .
Parameter Description Enter the name of the data synchronization task. To facilitate subsequent search and management, please use meaningful names as much as possible. Up to 64 characters are supported. The data source where the synchronization object is located, select the data source where the data to be copied is located. The data source that receives the synchronization object, select the target Kafka data source. Select the target Kafka Topic (topic), the data from the source data source will be written into the specified Topic. When delivering data to a Topic, you can specify which partition of the Topic to deliver the data to. - : Deliver all data to the default partition 0.
- : Distribute data to different partitions, the system will use the hash value of the database name and table name to calculate which partition the target data should be delivered to, ensuring that the same table's data is delivered to the same partition during the hash delivery process.
Supports and two types of . - : Copy all objects and data from the source data source, i.e., full data replication. The switch on the right is the switch for periodic full replication, more information, please refer to Periodic Full Replication.
- : After the full replication is completed, incremental replication is performed based on the logs of the source data source.
(not selectable only under ) Optional only when the task contains or .
The specifications of the replication task determine the replication speed: the larger the specification, the higher the replication rate. Hover over the icon to view the replication rate and configuration details for each specification. Each specification displays the available quantity and the total number of specifications. If the available quantity is 0, it will be grayed out and cannot be selected.
Required when is only . - : Perform incremental replication based on the current replication task start time.
- : Select the start time of incremental replication, you can choose the time zone according to your business region. If the time point is configured before the current replication task starts, the replication task will fail if there are DDL operations during that time period.
- : Stop the task if data is detected in the target table during the pre-inspection phase.
- : Ignore the data if it is detected in the target table during the pre-inspection phase, and append other data.
- : Delete the data if it is detected in the target table during the pre-inspection phase, and rewrite it.
On the tab, configure the following parameters, and then click .
Parameter Description Select the content to be replicated, you can choose to replicate all content of the source library, or you can choose , select the content to be replicated in the list, and click > to add to the right list. If you need to create multiple replication chains with the same replication object, you can create a configuration file and import it when creating a new task. Click on in the top right corner, then click Download Template to download the configuration file template to your local machine, edit it, and then click to upload the configuration file to achieve batch import. Configuration file description:
Parameter Description source_table_name
The source table name where the object to be synchronized is located. destination_table_name
The target table name where the object to be synchronized is received. source_schema_name
The source Schema name where the object to be synchronized is located. destination_schema_name
The target Schema name where the object to be synchronized is received. source_database_name
The source library name where the object to be synchronized is located. target_database_name
The target library name where the object to be synchronized is received. column_list
The list of fields to be synchronized. extra_configuration
Additional configuration information, you can configure the following information here: - Field mapping:
column_name
,destination_column_name
- Field value:
column_value
- Data filtering:
filter_condition
tipAn example of
extra_configuration
is as follows:{
"column_name": "created_time", //Specify the original column name for column name mapping
"destination_column_name": "migrated_time", //Map the target column name to "migrated_time"
"column_value": "current_timestamp()", //Change the field value of the column to the current timestamp
"filter_condition": "id != 0" //Only rows with ID not equal to 0 will be synchronized.
}
- For the overall example content of the configuration file, please refer to the downloaded template.
- Field mapping:
On the tab, you can configure each column that needs to be replicated to Kafka individually, by default, all columns of the selected table will be replicated. If there are updates in the source and target data sources during the configuration mapping phase, you can click the button in the top right corner of the page to refresh the information of the source and target data sources. After the configuration is completed, click .
On the tab, wait for the system to complete the pre-inspection, and after the pre-inspection is passed, click .
tipIf the pre-inspection does not pass, you need to click on the in the column on the right side of the target inspection item, troubleshoot the cause of the failure, manually fix it, and then click to re-execute the pre-inspection until it passes.
If the is for the inspection item, it can be fixed or ignored according to the specific situation.
On the page, it prompts , and the synchronization task starts running. At this time, you can perform the following operations:
- Click to view the execution of each stage of the synchronization task.
- Click to return to the task list page.
View Synchronization Results
Log in to NineData Console.
Click on > in the left navigation bar.
On the page, click the ID of the target synchronization task to open the page, the page description is as follows.
No. Feature Description 1 Synchronization Delay The data synchronization delay between the source and target data sources, 0 seconds indicates no delay between the two ends, meaning that the data on the Kafka side has currently caught up with the source side. 2 Configure Alerts After configuring alerts, the system will notify you through the method you choose when the task fails. For more information, please refer to Operations Monitoring Introduction. 3 More - : Pause the task, only tasks with the status are selectable.
- : End tasks that are not completed or listening (i.e., in incremental synchronization), after terminating the task, it cannot be restarted, please operate with caution.
- : Delete the task, the task cannot be recovered after deletion, please operate with caution.
4 Full Replication (Displayed in scenarios including full replication) Display the progress and detailed information of full replication. - Click on the on the right side of the page: View various monitoring metrics during the full replication process. During the full replication process, you can also click on the on the right side of the monitoring metrics page to limit the rate of writing to the target data source per second. The unit is MB/S.
- Click on the on the right side of the page: View the execution logs of full replication.
- Click on the icon on the right side of the page: View the latest information.
5 Incremental Replication (Displayed in scenarios including incremental replication) Display various monitoring metrics for incremental replication. - Click on the on the right side of the page: Limit the rate of writing to the target data source per second. The unit is rows/second.
- Click on the on the right side of the page: View the execution logs of incremental replication.
- Click on the icon on the right side of the page: View the latest information.
6 Modify Object Display the modification records of the synchronization object. - Click on the on the right side of the page to configure the synchronization object.
- Click on the icon on the right side of the page: View the latest information.
7 Expand Display detailed information of the current replication task, including , , , etc.
Appendix 1: Data Format Description
The data migrated from Oracle to Kafka will be stored in JSON format. The system will evenly divide the data in Oracle into multiple JSON objects, each representing a message (Message).
- Full replication phase: The number of Oracle data rows stored in a single message is determined by the message.max.bytesmessage.max.bytes is the maximum message size allowed in the Kafka cluster, with a default value of
1000000
bytes (i.e., 1MB). You can adjust this value by modifying themessage.max.bytes
parameter in the Kafka configuration file, so that more Oracle data rows will be stored in each message. Please note that since Kafka needs to allocate memory space for each message, setting this value too high may lead to a decline in Kafka cluster performance. parameter. - Incremental replication phase: A single message stores one row of Oracle data.
Field | Field Type | Field Description | Field Example |
---|---|---|---|
serverId | STRING | The data source information to which the message belongs, in the format: <connection address:port> . | "serverId":"47.98.224.21:3307" |
id | LONG | The Record id of the message. This field is globally incremented and is used as the basis for determining message duplicate consumption. | "Id":156 |
es | INT | Represented by Unix timestamp, it has different meanings in different task stages:
| "es":1668650385 |
ts | INT | The time when more data is delivered to Kafka, represented by Unix timestamp. | "ts":1668651053 |
isDdl | BOOLEAN | Whether the data is DDL, the values are:
| "is_ddl":true |
type | STRING | The type of data, the values are:
| "type":"INIT" |
database | STRING | The database to which the data belongs. | "database":"database_name" |
table | STRING | The table to which the data belongs. If the DDL statement corresponds to a non-table object, this field is set to null . | "table":"table_name" |
mysqlType | JSON | Reserved field, no need to pay attention. | "mysqlType": null |
sqlType | JSON | The data types of each field in the source Oracle. | "sqlType": {"id": "NUMBER","shipping_type":"varchar2(50)"} |
pkNames | ARRAY | The names of the primary keys (Primary Key) corresponding to the records (Record) in the Binlog. Values:
| "pkNames": ["id", "uid"] |
data | ARRAY[JSON] | The data that Oracle delivers to Kafka, stored in a JSON-formatted array.
| "data": [{ "name": "jl", "phone": "(737)1234787", "email": "caicai@yahoo.edu", "address": "zhejiang", "country": "china" }] |
old | ARRAY[JSON] | Records the details of the incremental replication from Oracle to Kafka.
For other operations, the value of this field is null . | "old": [{ "name": "someone", "phone": "(737)1234787", "email": "someone@example.com", "address": "somewhere", "country": "china" }] |
sql | STRING | If the current data is an incremental DDL operation, the corresponding SQL statement is recorded. For other operations, the value of this field is null . | "sql":"create table sbtest1(id int primary key,name varchar(20))" |
Appendix 2: Pre-Check Item List
Check Item | Check Content |
---|---|
Source Data Source Connection Check | Check the gateway status of the source data source, whether the instance is reachable, and the accuracy of the username and password |
Target Data Source Connection Check | Check the gateway status of the target data source, whether the instance is reachable, and the accuracy of the username and password |
Source Database Permission Check | Check whether the account permissions of the source database meet the requirements |
Target Database Permission Check | Check whether the Kafka account has access permission to the Topic |
Target Database Data Existence Check | Check whether there is already data in the Topic |
Source Database Log Archive Mode Check | Whether the source database log archive mode is ARCHIVELOG |
Source Database Supplemental Log Check | The source database supplemental log must be enabled and set to ALL |