AWS Database Migration Service (AWS DMS) makes it possible to replicate to Amazon Kinesis Data Streams from relational databases, data warehouses, NoSQL databases, and other types of data stores. You can use Kinesis data streams to collect and process large streams of data records in real time.
Replicating data changes to a Kinesis data stream is usually expected to be long-running and highly performant. AWS DMS offers multi-threaded full load and change data capture (CDC) task settings to help increase the speed of data transfer and the performance of CDC.
Customers often ask the following questions when tuning performance for a Kinesis Data Streams target:
What are the considerations when tuning ParallelLoad* and ParallelApply* for the Kinesis Data Streams target endpoint?
How do I tune parallel load or apply the parameters ParallelLoad* and ParallelApply* of an AWS DMS task for a certain source DB workload?
Should I use more Kinesis data stream shards that will be better in terms of replication performance?
This series is comprised of three parts. In this first post, we discuss the multi-threaded full load and CDC settings and considerations to tune related parameters for better performance to replicate to a Kinesis Data Streams target. In the second post, we provide a demonstration of AWS DMS performance tuning for a Kinesis Data Streams target with multi-threaded settings. In the third post, we share some other considerations and best practices for using Kinesis Data Streams as a target.
Solution overview
AWS DMS supports full load and CDC from any supported source to a Kinesis Data Streams target via attribute-value pairs in JSON. You can use AWS DMS object mapping to migrate your data from the supported data source to a target Kinesis data stream. A Kinesis data stream is made up of shards. You can define a partition key for each table in a DMS task, which Kinesis Data Streams uses to group the data into its shards.
In the full load phase, as shown in the following figure, AWS DMS loads tables in parallel to the Kinesis Data Streams target. The maximum tables to load in parallel is determined by the AWS DMS task setting MaxFullLoadSubTasks. AWS DMS loads each table into its corresponding target shard using a dedicated subtask. By default, AWS DMS loads eight tables in parallel; the maximum is 49 tables. The default partition key used by a DMS task is primary-key in the full load phase. You can choose to specify the AWS DMS object mapping parameter partition-key-type to schema-table, meaning the same table from the same schema of the source database will be loaded to the same shard in Kinesis Data Streams.
When the parallel load setting is used in the full load phase, or the parallel apply setting is used in the CDC phase, AWS DMS loads data and applies changes in multi-threads. You can use the AWS DMS object mapping parameter partition-key-type to set the partition key for the target. By default, with the ParallelLoad* or ParallelApply* task settings specified, the primary key of the table is used for the partition key in the full load or CDC phase, respectively. When the bulk data in the full load phase or serializable change records in the CDC phase arrive, AWS DMS extracts the partition key and applies a hash function to distribute the messages to their corresponding parallel load or apply queues. According to the partition key used, especially when the statistical distribution of the key is skewed, some queues can be very busy, whereas some queues can be relatively idle. As shown in the following figure, Qm(n-1)+1 is full, whereas Qm+2 and Qm+m are empty. The maximum number of records to store in each buffer queue is specified by ParallelLoadBufferSize for the full load phase and ParallelApplyBufferSize for the CDC phase. AWS DMS uses multiple threads to poll from the parallel load or apply queues. The number of threads is determined by ParallelLoadThreads in the full load phase and ParallelApplyThreads in the CDC phase. A thread only polls from a specific queue or group of queues exclusively. The number of the queue for each thread to poll is determined by the AWS DMS task setting ParallelLoadQueuesPerThread in the full load phase and ParallelApplyQueuesPerThread in the CDC phase. The total number of the queues in parallel is ParallelLoadQueuesPerThread × ParallelLoadThreads for the full load phase and ParallelApplyQueuesPerThread × ParallelApplyThreads for the CDC phase. Parallel*QueuesPerThread determines the total number of the records per batch to perform PutRecords to the Kinesis Data Streams target.
For more information about the parallel load and apply task settings, refer to Using Amazon Kinesis Data Streams as a target for AWS Database Migration Service.
Considerations for parallel load and apply settings
In this section, we discuss key considerations for the parallel load and apply settings.
Determine if the ordering of records needs to be preserved
Instead of preserving strict ordering, you might receive data from Kinesis Data Streams across different shards out of order. To achieve high performance, AWS DMS uses the Kinesis bulk PutRecords API as well as multi-threads. If the ordering of the message must be preserved, consider the following options:
If it matters for the ordering of all operations associated with the same primary key, use schema-table for primary-key in AWS DMS object mapping and specify one shard for the Kinesis target without parallel load or apply settings. This way, AWS DMS replicates data records in one thread as the same serializable order as the transactions associated with the same primary key in the source database to the same Kinesis shard. Kinesis Data Streams has a PutRecord API limit 1,000 TPS. Each shard can support writes up to 1,000 records per second, and a maximum data write total of 1 MB per second. For more details, refer to Quotas and Limits.
If it matters for the ordering of all operations for individual tables, use schema-table for partition-key-type in AWS DMS object mapping and specify one shard for the Kinesis target without parallel load or apply settings. This way, AWS DMS replicates data records in one thread as the same serializable order as the transactions associated with the same table in the source database to the same Kinesis shard. Also note the capacity of the target Kinesis mentioned above, especially when the workload from the source database is intensive.
Use the endpoint setting for the target IncludeTransactionDetails to provide detailed transaction information from the source database. You can use information such as log position and transaction ID to recognize the order of the messages by the downstream application. This approach is suitable when the source database is highly transactional and parallel load and apply settings are used for better performance.
Understand the workloads of the source database
The data size and transaction rate determine how much parallelism is needed to achieve the desired replication performance. Typically, in the full load phase, you need to understand the size of the table and the number of rows in the table. In the CDC phase, you need to understand the number of records per second instead of the number of transactions per second because one transaction can contain many records. This information is used to specify the parallel load and apply settings for AWS DMS to improve replication performance.
Determine the number of shards and partition key of the target
For the Kinesis Data Streams target, each shard provides a fixed unit of capacity. You can choose between on-demand mode and provisioned mode for your data streams. If your data rate changes, in provisioned mode, you can increase or decrease the number of shards allocated to your stream. With on-demand mode, Kinesis Data Streams automatically manages the shards in order to provide the necessary throughput. The capacity of the Kinesis target can be the bottleneck of the replication performance; therefore, make sure Kinesis is configured with a sufficient number of shards to handle more parallelism.
The partition key is an essential factor to determine the capacity utilization of the target. Based on the statistical distribution of the partition key in the source database, some shards may be hotter than others. As discussed earlier in this post, the key is also used by AWS DMS to distribute the data records to the parallel load and apply queues.
Two typical scenarios are associated with the partition key:
The distribution of the data is skewed according to the partition key; therefore, a few shards can be significantly throttled. In this case, you need to consider some other partition key and distribute the data evenly among shards.
When using a partition key, data is evenly distributed among shards. However, the maximum workloads from the source database can exceed the throughput capacity of the Kinesis shard. In this case, you need to increase the number of shards, for example, doubling the current number of shards, or consider using on-demand mode for the Kinesis data stream capacity.
Size the replication instance properly for more parallelism
Typically, the more parallelism you use with AWS DMS parallel load and apply settings, the more resources the replication instance needs. Size up the replication instance when increasing MaxFullLoadSubTasks, ParallelLoad*, and ParallelApply*. A general rule is to match the number of vCPUs with MaxFullLoadSubTasks × ParallelLoadThreads × the number of tasks run in parallel for full load, and ParallelApplyThreads × the number of tasks run in parallel for CDC. Alternatively, consider using AWS DMS Serverless, which provides automatic provisioning and scaling. Also note the current limitations with AWS DMS Serverless. Closely monitor resource utilization of the replication instance, like the Amazon CloudWatch metrics CPUUtilization, FreeableMemory, and SwapUsage, when tuning MaxFullLoadSubTasks, ParallelLoad*, and ParallelApply*.
Example use case for parallel apply settings
Let’s explore an example. Consider the following use case:
We conduct a replication from Amazon Aurora PostgreSQL-Compatible Edition to Kinesis Data Streams using pglogical for WAL decoding
The workload simulates 1,600 records per second, and 1 KB record size
There is no resource contention in the source database and the replication instance
Let’s see how parallel apply settings are specified for this use case.
First, for the Kinesis Data Streams target of the AWS DMS task, you need to determine how many shards are needed. You identify how the data is distributed and the number and size of records to put to each shard by the partition key. AWS DMS uses the primary key of the source table by default.
After determining the number of shards for the target, you can then determine the ParallelApply* settings for the AWS DMS task. For example, suppose we use eight shards for the Kinesis Data Streams target in the first step and the data is evenly distributed among the shards according to the primary key of the source database. Typically, you can specify ParallelApplyThreads as the multiple of the number of shards. You can start from the same number of shards first and tune later as needed.
Assuming the average round trip time between the replication instance and Kinesis Data Streams target is 20 milliseconds, that is 50 round trips per second (1,000 ms / 20 ms = 50). You can use the AWS DMS diagnostic support AMI or a networking diagnostic command such as hping3 to measure the round trip time between the replication instance and the target endpoint.
The formula of the configuration is the following:
ParallelApplyThreads × ParallelApplyQueuesPerThread × number of round trips between the replication instance and Kinesis data stream per second >= number of records changed per second in the source database.
Let’s compare two settings (assuming 50 round trips per second).
Our initial settings are as follows:
ParallelApplyThreads = # of shards = 8
ParallelApplyQueuesPerThread = 4
With the preceding initial settings, AWS DMS will be able to push roughly 8 x 4 x 50 = 1,600 records per second with multiple PutRecords API calls. 5 MB per PutRecords API call also plays a role to bound our replication throughput. In our case, ParallelApplyQueuesPerThread × size per record = 4 x 1 KB = 4 KB is within this limit.
Let’s see how different settings can impact our replication throughput:
ParallelApplyThreads = 16
ParallelApplyQueuesPerThread = 1
With these setting, AWS DMS will be able to push roughly 16 x 1 x 50 = 800 records per second.
The first setting is able to satisfy the requirement for replicating 1,600 records per second from the source workload, whereas the second setting is not sufficient even though the number of threads (ParallelApplyThreads) is higher than the first setting. This is because ParallelApplyQueuesPerThread is not tuned (by default, ParallelApplyQueuesPerThread=1) or it is too small in the second setting.
The base assumption is that the data is evenly distributed among the shards by the partition key (the default is the primary key of the source table). If there is any spike due to the data being temporarily skewed, adjust ParallelApplyThreads and ParallelApplyQueuesPerThread to handle a larger load. If it’s always skewed to a certain shard, consider changing to a different partition key. Also note the payload size of the source workloads with multiple queues specified results in the size of the PutRecords API call, which is limited by 5 MB per PutRecords API call.
General best practices for tuning parallel load and apply settings
Keep in mind the following best practices:
Consider if parallel load and apply settings are suitable for your particular use case when the ordering of records needs to be preserved.
Select the partition key for Kinesis Data Streams wisely based on your understanding of the statistical distribution according to the key of the source data. The goal is to evenly distribute the data among shards. Do not use any unique key of the source table that allows null for the AWS DMS object mapping parameter partition-key-type or the partition key of the target endpoint.
Choose the proper data stream capacity mode for the target to ensure sufficient capacity. Typically, the CloudWatch metric WriteProvisionedThroughputExceeded for Kinesis Data Streams and metrics CDCIncomingChanges and CDCLatencyTarget for AWS DMS will spike when target capacity is the bottleneck.
Size up the replication instance for more parallelism, and also consider using AWS DMS Serverless (noting its limitations). Monitor the CloudWatch metrics CPUUtilization, FreeableMemory and SwapUsage for AWS DMS closely when tuning MaxFullLoadSubTasks, ParallelLoad* and ParallelApply*.
Parallel*Threads and Parallel*QueuesPerThread must be tuned together based on the number of records changed in the source database. A balanced batch size with threads is better than a small batch but a large number of threads. Increase ParallelApplyBufferSize when you have more messages to handle per queue, or potentially some parallel load and apply queues can be busier than others because the data is skewed based on the partition key used (ideally, you need to use a better partition key).
In the CDC phase, closely monitor the CloudWatch metrics PutRecords.Records for Kinesis Data Streams and CDCThroughputRowsSource and CDCThroughputRowsTarget for AWS DMS, and tune ParallelApply* accordingly. The goal is to match these three CloudWatch metrics.
Increase the AWS DMS stream buffer when needed, especially when handling large objects or seeing warnings about stream buffer in the AWS DMS log.
Do not use detailed debug logging for the AWS DMS task under production usage. Intensive detailed debug logging processes can block the replication, end up with source changes captured accumulated in the replication instance, and result in sudden spikes of CDCThroughputRowsSource and CDCThroughputRowsTarget.
Conclusion
In this post, we talked about the high-level architecture of multi-threaded full load and CDC settings, and considerations and general best practices to tune related parameters for better performance to replicate to a Kinesis Data Streams target. With the complexity involved in database migrations, we highly recommend testing the parameters in non-production environments with simulated production workloads prior to making changes in production.
In the second post in this series, we demonstrate the multi-threaded settings with different settings and evaluate the performance.
We welcome your feedback. If you have any questions or suggestions, leave them in the comments section.
About the Authors
Siva Thang is a Senior Solutions Architect, Partners with AWS. His specialty is in databases and analytics, and he also holds a master’s degree in Engineering. Siva is deeply passionate about helping customers build a modern data platform in the cloud that includes migrating to modern relational databases and building data lakes and data pipelines at scale for analytics and machine learning. Siva also likes to present in various conferences and summits on the topic of modern databases and analytics.
Suchindranath Hegde is a Data Migration Specialist Solutions Architect at Amazon Web Services. He works with our customers to provide guidance and technical assistance on data migration into the AWS Cloud using AWS DMS.
Wanchen Zhao is a Senior Database Specialist Solutions Architect at AWS. Wanchen specializes in Amazon RDS and Amazon Aurora, and is a subject matter expert for AWS DMS. Wanchen works with SI and ISV partners to design and implement database migration and modernization strategies and provides assistance to customers for building scalable, secure, performant, and robust database architectures in the AWS Cloud.
Michael Newlin is a Cloud Support DBE with Amazon Web Services and Subject Matter Expert for AWS DMS. At AWS, he works with customers and internal teams to ensure smooth and fast transitions of database workloads to AWS.
Jay Chen is a Software Development Manager at AWS DMS, where he oversees the development of DMS endpoints, including S3, Kinesis, Kafka, Opensearch, Timestream, and others. Jay is widely recognized for his significant contributions to the database field. He co-authored the Star Schema Benchmark, which is a standard benchmark based on the TPC-H benchmark for OLAP databases. Moreover, he has contributed as a co-author to C-STORE: A COLUMN-ORIENTED DBMS.
Source: Read More