In Part 1 of this series, we discussed the high-level architecture of multi-threaded full load and change data capture (CDC) settings to tune related parameters for better performance to replicate data to an Amazon Kinesis Data Streams target using AWS Database Migration Service (AWS DMS). In Part 2, we provided some examples of how we can yield different results by adjusting the multi-threaded settings. In this post, we discuss other key considerations when using Kinesis Data Streams as a target.
Prerequisites
To follow along with this post, you should have familiarity with the following AWS Services:
AWS DMS
Amazon Kinesis Data Streams
Amazon RDS for PostgreSQL
Use a VPC interface endpoint to keep end-to-end communication private
Starting with AWS DMS version 3.4.7, AWS DMS now supports communication with services such as Amazon Kinesis, Amazon Simple Storage Service (Amazon S3), Amazon DynamoDB, and Amazon Redshift over Amazon Virtual Private Cloud (Amazon VPC) endpoints. As shown in the following diagram, the data between the AWS DMS replication task and Kinesis target is private, which ensures consistent bandwidth and throughput while keeping end-to-end communication private rather than traversing the public internet.
To set up a VPC interface endpoint for Kinesis, complete the following steps:
On the Amazon VPC console, choose the same AWS Region as your AWS DMS replication instance.
In the navigation pane, choose Endpoints.
Choose Create endpoint.
For Service, choose the kinesis-streams service. For example, com.amazonaws.us-east-1.kinesis-streams.
For VPC, choose the same VPC as your AWS DMS replication instance to create the endpoint.
For Subnets, choose the same subnets that you used to deploy the AWS DMS replication instance.
For Security groups, choose the same security group that you used to deploy the AWS DMS replication instance.
To specify access control, under Policy, choose Full access, or if you want to use a policy creation tool to specify your own access control, choose Custom.
For Endpoints, verify that your newly created VPC endpoint status is Available.
Choose an appropriate candidate on the source as the partition key
In Kinesis, streams are made of different shards and records are ordered per shard. AWS DMS uses table mapping rules to map data from the source to the target Kinesis data stream. The possible values of partition-key-type for data records are schema-table, transaction-id, primary-key, constant, and attribute-name. You need to choose the correct mapping rule to make sure that the data is migrated in an efficient manner and avoid target latency.
In this scenario, we migrate data from a table called employee and demonstrate how choosing one column over the other impacts the latency of the AWS DMS task. Complete the following steps:
Use the following table DDL on your source (PostgreSQL) instance to create the employee table:
create table employee (employee_id serial not null primary key,employee_name varchar(64) ,dept_no varchar(3));
Create a Kinesis stream and set capacity mode to on-demand.
Create an AWS DMS task using the option Migrate existing data and replicate ongoing changes or Replicate data changes only.
 Set up the AWS DMS task with the following table mapping rule to migrate theemployeetable under thepublicschema. SetParallelApplyThreadsto 32,ParallelApplyQueuesPerThreadto 512,partition-key-typeasattribute-name, and setdept_nofrom the table to distribute the data across the shards. The following mapping rule makes sure that all the employees’ data will be grouped into the same shard if they belong to the same department:
During the ongoing replication phase, insert the following record in the PostgreSQL database:
In the preceding DML statement, you insert 1 million records into the table employee and assign each employee to department number 1. In this example, all 1 million records will be moved into the same Kinesis shard because they belong to in the same department (1).
Now you can investigate how the records will be migrated based on the mapping rule. To do so, you can use Amazon CloudWatch metrics. For more information, refer to Replication task metrics.
As shown in the following figure of the CDCIncomingChanges metric, 1 million changes were inserted and it took about 3 hours to process the changes, giving a throughput of 92 records/ second.
The records are accumulating on the underlying storage of the AWS DMS replication instance and are waiting to be applied on the target, as shown by the CDCChangesDiskTarget metric.
The CDCLatencyTarget metric shows that the target latency is building up to 9,893 seconds. Although you chose the on-demand mode of the data stream, the target latency is increasing because you’re inserting all the employees with the same department number and therefore moving data into the same shard.
Now you can repeat the same exercise, but enter the following mapping rule, where you choose employee_id in the attribute mappings instead of dept_no:
We can conclude that changing the attribute from dept_no to employee_id distributes the data in an optimal manner across the shards. The CDCLatencyTarget metric shows that the latency reduced from 9,893 seconds to 2.4 seconds as a result of the change. One million records took 2.4 seconds to process, giving a throughput of 416,666 records/second.
Therefore, it’s important to understand the nature of DML statements on the source and choose an appropriate setting for partition-key-type.
If you have several tables on the source and each table has a limited range for a primary key, use the endpoint setting PartitionIncludeSchemaTable, which prefixes schema and table names to partition values. Doing this increases data distribution among Kinesis shards. For example, suppose that a sysbench schema has thousands of tables and each table has only limited range for a primary key. In this case, the same primary key is sent from thousands of tables to the same shard, which causes throttling.
Provision a sufficient number of shards
In Kinesis, the maximum throughput depends on the number of shards provisioned for the stream. Each shard can support up to 1 MB/second or 1,000 records/second write throughput or up to 2 MB/second or 2,000 records/second read throughput.
In this scenario, you migrate data from the employee table you created earlier and demonstrate how not provisioning sufficient shards on Kinesis impacts the AWS DMS task. Complete the following steps:
Create a Kinesis data steam and choose the provisioned capacity mode. In this example, you provision eight shards on the target, which supports a maximum write throughput of 8,000 records/second.
Create an AWS DMS task using the option Migrate existing data and replicate ongoing changes or Replicate data changes only.
Set up the AWS DMS task with the following table mapping rule to migrate the employee table under the public schema. Set partition-key-type as attribute-name and select employee_id from the table to distribute the data uniformly across the shards:
During the ongoing replication phase, run the following command to insert 10 million records into the employee table:
Because you haven’t provisioned sufficient shards on Kinesis, the CDCChangesDiskTarget metrics show that rows are accumulating on disk and waiting to be committed.
You can find following snippet in the CloudWatch logs, which shows that reading from the source is paused when the swap space exceeds 1 GB, thereby slowing down the replication:
2023-06-06T15:00:47 [SORTER ]I: Reading from source is paused. Total storage used by swap files exceeded the limit 1048576000 bytes (sorter_transaction.c:110)
We can conclude that in addition to choosing an efficient partition-key-type setting, you need to provision sufficient shards on Kinesis to avoid swapping on the AWS DMS replication instance and ensure that the data is loaded in an optimal manner.
For monitoring purposes, use the CloudWatch metric WriteProvisionedThroughputExceeded to monitor the number of records rejected due to throttling for the stream or shard over the specified time period. This metric includes throttling from PutRecord and PutRecords operations that come from the AWS DMS tasks. AWS DMS automatically retries PutRecord and PutRecords with a 5-minute timeout in version 3.4.7 and a 30-minute timeout in version 3.5.1. When write throughput is frequently throttled due to insufficient capacity on the Kinesis end, it can result in potential missing data during migration when the retry timeout is exceeded. Choose between on-demand mode or provisioned mode for your data streams and ensure a sufficient number of shards on Kinesis.
Payload message should not exceed 1 MB
In Kinesis, the maximum size of the data payload of a record is up to 1 MB. In this scenario, you migrate data from a table called sample and observe what happens when you migrate a large object (LOB) exceeding 1 MB. Complete the following steps:
Use the following table DDL on the source (PostgreSQL) instance to create the employee table:
Create a Kinesis steam and choose the provisioned capacity mode. In this example, you provision eight shards on the target, which supports a maximum write throughput of 8,000 records/second.
Create an AWS DMS task using the option Migrate existing data and replicate ongoing changes or Replicate data changes only.
Choose one of the partition-key-type settings available when defining the object mapping.
During the ongoing replication phase, run the following command to insert some random text into the sample table:
The following CloudWatch snippet shows that AWS DMS skipped the record because the data exceeded 1 MB:
2023-10-26T19:33:44 [TARGET_LOAD ]W: Payload size exceeds Kinesis limit. Skip this record. (kinesis_records.c:90)
Make sure that the total size of the record inserted into Kinesis does not exceed 1 MB.
You can run the following query to check the size of the LOB column:
If the LOB column exceeds 1 MB, you can choose one of the two options:
Exclude the LOB column while migrating the data
Choose a target other than Kinesis that doesn’t have LOB restrictions
Out of order and data deduplication
In Kinesis, records can be inserted using the PutRecord or the PutRecords API. In AWS DMS, when you use multi-thread settings, you use the PutRecords API. PutRecords doesn’t guarantee the ordering of records. If you need to read records in the same order they are written to the stream, use PutRecord instead of PutRecords, and write to the same shard.
In this scenario, you migrate 10 records from the employee table you created earlier and demonstrate how data is distributed across the shards and the order in which they are delivered to the shard. Complete the following steps:
Create a Kinesis steam and choose the provisioned capacity mode. In this example, you provision eight shards on the target, which supports a maximum write throughput of 8,000 records/second.
Create an AWS DMS task using the option Migrate existing data and replicate ongoing changes or Replicate data changes only.
Set up the AWS DMS task with the following table mapping rule to migrate the employee table under the public schema. Set partition-key-type as attribute-name and select employee_id from the table to distribute the data uniformly across the shards:
During the ongoing replication phase, insert the following 10 records in the PostgreSQL database:
Query the replication slot using the test decoding plugin and pg_logical_slot_peek_changes function to see the timestamp of the data committed on the source:
By investigating the JSON data in the shards, you can see that employee_id 1 and 2 were inserted in shard 34. employee_id 3 and 5 were inserted into shard 35. employee_id 4 was moved to shard 33, although it was inserted after employee_id 1 and 2.
See the following data for shardId-000000000034:
See the following data for shardId-000000000033:
See the following data for shardId-000000000035:
We can conclude that the commit-timestamp did not influence the way that data was distributed across the shards. To ensure ordering of data, you must load data without using parallel load. This will ensure ordering of data, although it can lead to higher replication latency. In addition, you need to set partition-key-type as schema-table, constant, or attribute-name to push the data in the same shard.
NULLABLE columns
Make sure that the partition-key-type setting in the object mapping doesn’t have NULL values on the source. In this scenario, you migrate a record from a table called customer and demonstrate how partition-key-value is defined on the target for records that contain a NULL value. Complete the following steps:
Use the following table DDL on the source (PostgreSQL) instance to create the customer table:
Create a Kinesis data steam and choose the provisioned capacity mode. In this example, you provision eight shards on the target, which supports a maximum write throughput of 8,000 records/second.
Create an AWS DMS task using the option Migrate existing data and replicate ongoing changes or Replicate data changes only.
Set up the AWS DMS task with the following table mapping rule to migrate the customer table under the public schema. Set partition-key-type as attribute-name and select the email column from the table to distribute the data across the shards:
During the ongoing replication phase, insert the following record in the PostgreSQL database:
In the preceding DML statement, you insert one record but don’t populate the email column, meaning the value is NULL on the source.
You can see the following snippet in the CloudWatch logs:
2023-06-06T21:46:04 [TARGET_APPLY Â Â ]W: Â insert record has empty value for table: public.CustomerData attribute-name: root_key for partition key. Using default partition key:
AWS DMS chose a default partition key and assigned a partition-key-value of schemaname.target-table-name.primary-key (public.CustomerData.3), as seen in the following code, when retrieving the record from Kinesis:
You must make sure that the partition key does not have NULL values, because this can affect the partitioning of the data on Kinesis.
Clean up
Complete the following steps to clean up your resources:
On the AWS DMS console, delete any AWS DMS tasks that you set up.
Delete the Kinesis data streams that you set up.
Run the following command on the on the source (PostgreSQL) to delete the tables:DROP TABLE employee;DROP TABLE sample;DROP TABLE customer;
Delete the database if you no longer need it.
Conclusion
In this blog series, we discussed how you can tune AWS DMS replication performance for Amazon Kinesis. In part 1, we discussed the multi-threaded architecture of full load and CDC. In part 2, we demonstrated the settings. In this post, we discussed key considerations and best practices when replicating data using AWS DMS to Kinesis. By understanding the configuration in AWS DMS task and Amazon Kinesis, you can optimize replication of data to Amazon Kinesis.
We welcome your feedback. If you have any questions or suggestions, leave them in the comments section.
About the Authors
Siva Thang is a Senior Solutions Architect, Partners with AWS. His specialty is in databases and analytics, and he also holds a master’s degree in Engineering. Siva is deeply passionate about helping customers build a modern data platform in the cloud that includes migrating to modern relational databases and building data lakes and data pipelines at scale for analytics and machine learning. Siva also likes to present in various conferences and summits on the topic of modern databases and analytics.
Suchindranath Hegde is a Data Migration Specialist Solutions Architect at Amazon Web Services. He works with our customers to provide guidance and technical assistance on data migration into the AWS Cloud using AWS DMS.
Wanchen Zhao is a Senior Database Specialist Solutions Architect at AWS. Wanchen specializes in Amazon RDS and Amazon Aurora, and is a subject matter expert for AWS DMS. Wanchen works with SI and ISV partners to design and implement database migration and modernization strategies and provides assistance to customers for building scalable, secure, performant, and robust database architectures in the AWS Cloud.
Michael Newlin is a Cloud Support DBE with Amazon Web Services and Subject Matter Expert for AWS DMS. At AWS, he works with customers and internal teams to ensure smooth and fast transitions of database workloads to AWS.
Jay Chen is a Software Development Manager at AWS DMS, where he oversees the development of DMS endpoints, including S3, Kinesis, Kafka, Opensearch, Timestream, and others. Jay is widely recognized for his significant contributions to the database field. He co-authored the Star Schema Benchmark, which is a standard benchmark based on the TPC-H benchmark for OLAP databases. Moreover, he has contributed as a co-author to C-STORE: A COLUMN-ORIENTED DBMS.
Source: Read More