Last year and early this year, we opened source
MongoShakeand
RedisShakeThey are used for migration, synchronization, backup and other requirements of MongoDB and Redis. Dynamo-shake is a new tool for dynamodb migration. Migration from Dynamodb to MongoDB is currently supported, but we will consider supporting multiple channels in the future, such as direct file backup, migration to Kafka, or migration to other databases such as Cassandra, Redis, etc.


Download address: temporarily closed, please contact Zhu Zhao.

DynamoShake basic functions

DynamoDB supports full and incremental synchronization. After a process is started, full synchronization is performed first, and incremental synchronization is performed after full synchronization is complete. Full synchronization is divided into two parts: data synchronization and index synchronization. Data synchronization is used to synchronize data. After data synchronization is complete, index synchronization will be performed. Incremental synchronization only synchronizes data. Indexes generated during incremental synchronization are not synchronized. In addition, full and incremental synchronization phases do not support DDL operations on original library tables, such as deleting tables, building tables, building indexes, etc.



Breakpoint continuingly

Full synchronization does not support breakpoint continuation. Incremental synchronization supports breakpoint continuation. That is, if an increment is interrupted, only incremental breakpoint continuation can be performed within a certain period of time. However, in some cases, such as when the disconnect is too long, or when the previous loci (see below) are lost, full synchronization can be retriggered.

Synchronous data

All tables on the source end will be written to different tables in the destination library (default: table1, Table2). For example, if the user has table1, table2, then the destination end will have a dynamo-Shake library with tables in Table1 and Table2. In native Dynamodb, the protocol wraps a layer of type fields in the “key: type: value” format. For example, if a user inserts a {hello: 1} field, the dynamodb interface obtains {” Hello “: {“N”: 1}} format. Dynamo all data types:

  • String
  • Binary
  • Number
  • StringSet
  • NumberSet
  • BinarySet
  • Map
  • List
  • Boolean
  • Null

Raw is written to the raw data obtained by the bare Dynamodb interface:

Rszz-4.0-2 :PRIMARY> use dynamo-shake switched to db dynamo-shake rszz-4.0-2:PRIMARY> db. Zhuzhao.find()
{ "_id" : ObjectId("5d43f8f8c51d73b1ba2cd845"), "aaa" : { "L": [{"S" : "aa1" }, { "N" : "1234"}},"hello_world" : { "S" : "f2"}} {"_id" : ObjectId("5d43f8f8c51d73b1ba2cd847"), "aaa" : { "N" : "222" }, "qqq" : { "SS" : [ "h1"."h2"]},"hello_world" : { "S" : "yyyyyyyyyyy" }, "test" : { "S" : "aaa"}} {"_id" : ObjectId("5d43f8f8c51d73b1ba2cd849"), "aaa" : { "L": [{"N" : "0" }, { "N" : "1" }, { "N" : "2"}},"hello_world" : { "S" : "Test Chinese"}}Copy the code

Change represents the stripping type field:

Rszz-4.0-2 :PRIMARY> use dynamo-shake switched to db dynamo-shake rszz-4.0-2:PRIMARY> db. Zhuzhao.find()
{ "_id" : ObjectId("5d43f8f8c51d73b1ba2cd845"), "aaa" : [ "aa1", 1234]."hello_world" : "f2" }
{ "_id" : ObjectId("5d43f8f8c51d73b1ba2cd847"), "aaa" : 222, "qqq" : [ "h1"."h2"]."hello_world" : "yyyyyyyyyyy"."test" : "aaa" }
{ "_id" : ObjectId("5d43f8f8c51d73b1ba2cd849"), "aaa": [0, 1, 2],"hello_world" : "Test Chinese" }Copy the code

Users can customize their own synchronization types according to their own needs.

site

Incremental breakpoint continuation is implemented based on the point, the default point is written to the destination MongoDB, the library name is dynamo-shake-checkpoint. Each table records a checkpoint table, as well as a status_table table that records whether the current synchronization phase is full or incremental.

Rszz-4.0-2 :PRIMARY> use dynamo-shake42-checkpoint switched to db dynamo-shake42-checkpoint rszz-4.0-2:PRIMARY> show Collections status_table ZZ_INCR0 ZZ_INCR1 RSZZ-4.0-2 :PRIMARY> RSZZ-4.0-2 :PRIMARY> RSZZ-4.0-2 :PRIMARY> db.status_tablefind()
{ "_id" : ObjectId("5d6e0ef77e592206a8c86bfd"), "key" : "status_key"."status_value" : "incr_sync"} RSZZ - 4.0-2: PRIMARY > the zz_incr0.find()
{ "_id" : ObjectId("5d6e0ef17e592206a8c8643a"), "shard_id" : "shardId-00000001567391596311-61ca009c"."father_id" : "shardId-00000001567375527511-6a3ba193"."seq_num" : ""."status" : "no need to process"."worker_id" : "unknown-worker"."iterator_type" : "AT_SEQUENCE_NUMBER"."shard_it" : ""."update_date" : "" }
{ "_id" : ObjectId("5d6e0ef17e592206a8c8644c"), "shard_id" : "shardId-00000001567406847810-f5b6578b"."father_id" : "shardId-00000001567391596311-61ca009c"."seq_num" : ""."status" : "no need to process"."worker_id" : "unknown-worker"."iterator_type" : "AT_SEQUENCE_NUMBER"."shard_it" : ""."update_date" : "" }
{ "_id" : ObjectId("5d6e0ef17e592206a8c86456"), "shard_id" : "shardId-00000001567422218995-fe7104bc"."father_id" : "shardId-00000001567406847810-f5b6578b"."seq_num" : ""."status" : "no need to process"."worker_id" : "unknown-worker"."iterator_type" : "AT_SEQUENCE_NUMBER"."shard_it" : ""."update_date" : "" }
{ "_id" : ObjectId("5d6e0ef17e592206a8c86460"), "shard_id" : "shardId-00000001567438304561-d3dc6f28"."father_id" : "shardId-00000001567422218995-fe7104bc"."seq_num" : ""."status" : "no need to process"."worker_id" : "unknown-worker"."iterator_type" : "AT_SEQUENCE_NUMBER"."shard_it" : ""."update_date" : "" }
{ "_id" : ObjectId("5d6e0ef17e592206a8c8646a"), "shard_id" : "shardId-00000001567452243581-ed601f96"."father_id" : "shardId-00000001567438304561-d3dc6f28"."seq_num" : ""."status" : "no need to process"."worker_id" : "unknown-worker"."iterator_type" : "AT_SEQUENCE_NUMBER"."shard_it" : ""."update_date" : "" }
{ "_id" : ObjectId("5d6e0ef17e592206a8c86474"), "shard_id" : "shardId-00000001567466737539-cc721900"."father_id" : "shardId-00000001567452243581-ed601f96"."seq_num" : ""."status" : "no need to process"."worker_id" : "unknown-worker"."iterator_type" : "AT_SEQUENCE_NUMBER"."shard_it" : ""."update_date" : "" }
{ "_id" : ObjectId("5d6e0ef27e592206a8c8647e"), "shard_id" : "shardId-00000001567481807517-935745a3"."father_id" : "shardId-00000001567466737539-cc721900"."seq_num" : ""."status" : "done"."worker_id" : "unknown-worker"."iterator_type" : "LATEST"."shard_it" : "Arn: aws: dynamodb: us - east - 0770237302: the fats table/zz_incr0 / stream/T08:2019-08-27 struck. 043 | 1 | AAAAAAAAAAGsTOg0 + 3 + yzzD1cTzc7 hy TPXi/iBi7sA5Q6SGSoaAJ2gz2deQu5aPRW/flYK0pG9ZUvmCfWqe1A5usMFWfVvd+yubMwWSHfV2IPVs36TaQnqpMvsywll/x7IVlCgmsjr6jStyonbuHlUY wKtUSq8t0tFvAQXtKi0zzS25fQpITy/nIb2y/FLppcbV/iZ+ae1ujgWGRoojhJ0FiYPhmbrR5ZBY2dKwEpok+QeYMfF3cEOkA4iFeuqtboUMgVqBh0zUn87i yTFRd6Xm49PwWZHDqtj/jtpdFn0CPoQPj2ilapjh9lYq/ArXMai5DUHJ7xnmtSITsyzUHakhYyIRXQqF2UWbDK3F7+Bx5d4rub1d4S2yqNUYA2eZ5CySeQz7 CgvzaZT391axoqKUjjPpdUsm05zS003cDDwrzxmLnFi0/mtoJdGoO/FX9LXuvk8G3hgsDXBLSyTggRE0YM+feER8hPgjRBqbfubhdjUxR+VazwjcVO3pzt2n IkyKPStPXJZIf4cjCagTxQpC/UPMtcwWNo2gQjM2XSkWpj7DGS2E4738biV3mtKXGUXtMFVecxTL/qXy2qpLgy4dD3AG0Z7pE+eJ9qP5YRE6pxQeDlgbERg= ="."update_date" : "" }
{ "_id" : ObjectId("5d6e1d807e592206a8c9a102"), "shard_id" : "shardId-00000001567497561747-03819eba"."father_id" : "shardId-00000001567481807517-935745a3"."seq_num" : "39136900000000000325557205"."status" : "in processing"."worker_id" : "unknown"."iterator_type" : "AT_SEQUENCE_NUMBER"."shard_it" : "Arn: aws: dynamodb: us - east - 0770237302: the fats table/zz_incr0 / stream/T08:2019-08-27 struck. 043 | 1 | AAAAAAAAAAFw/qdbPLjsXMlPalnhh55 koia44yz6A1W2uwUyu/MzRUhaaqnI0gPM8ebVgy7dW7dDWLTh/WXYyDNNyXR3Hvk01IfEKDf+FSLMNvh2iELdrO5tRoLtZI2fxxrPZvudRc3KShX0Pvqy2YY wl4nlBR6QezHTWx5H2AU22MGPTx8aMRbjUgPwvgEExRgdzfhG6G9gkc7C71fIc98azwpSm/IW+mV/h/doFndme47k2v8g0GNJvgLSoET7HdJYH3XFdqh4QVD IP4sbz8X1cpN3y8AlT7Muk2/yXOdNeTL6tApuonCrUpJME9/qyBYQVI5dsAHnAWaP2Te3EAvz3ao7oNdnA8O6uz5VF9zFdN1OUHWM40kLUsX4sHve7McEzFL gf4NL1WTAnPN13cFhEm9BS8M7tiJqZ0OzgkbF1AWfq+xg/O6c57/Vvx/G/75DZ8XcWIABgGNkWBET/vLDrgjJQ0PUZJZKNmmbgKKTyHgSl4YOXNEeyH7l6at uc2WaREDjbf7lnQO5No11sz4g3O+AreBcpGVhdZNhGGcrG/wduPYEZfg2hG1sfYiSAM8GipUPMA0PM7JPIJmqCaY90JxRcI1By24tpp9Th35/5rLTGPYJZA= ="."update_date" : "" }Copy the code

“Status_value” : “incr_sync” in the status_table table indicates that the increment phase is entered. Each incremental shard records a checkpoint. For details about shard splitting rules, refer to dynamodb’s Guan ‘fa documentation. Mysql > alter table checkpoint checkpoint

  • _id: mongodb has its own primary key ID
  • shard_id: ID of the shard. Each shard has a unique ID
  • father_id: Parent SHard ID. A shard may have a parent shard.
  • seq_num: Sequence number inside the shard processed so far. This is the main point information.
  • status: In the current synchronization phase, there are the following states:

    • “Not process”: not processed
    • “No need to process” : there is no need to process
    • “Prepare stage” : Prepare for processing
    • “In processing”: it is being processed
    • “Wait father finish” : Wait for the parent node to finish processing
    • “Done” : The process is complete


  • worker_id: indicates the id of the worker being processed
  • iterator_type: Traversal mode of the shard
  • shard_it: shard’s iterator address, secondary point information.
  • update_date: checkpoint Indicates the updated timestamp

The index

Create a unique index based on the default primary key and create a Shard key based on the partition key. The user’s own index GSI is not currently being created.

DynamoShake internal architecture

This section describes some architectural details of DynamoShake

Full amount of synchronization

The following figure shows the basic data synchronization architecture of a table (tableSyncer is started by dynamo-Shake and the concurrency is controlled by the user). The fetcher thread pulls data from dynamodb at the source end and pushes the data to the queue. The Parser thread then takes the data from the queue and parses it (dynamo goes to BSON), while the executor aggregates some of the data and writes it to mongodb.



  • The fetcher. Currently, there is only one fetcher thread, which uses the protocol conversion driver provided by AWS. Fetcher fetcher fetcher fetcher fetcher fetcher fetcher fetcher fetcher fetcher fetcher fetcher fetcher fetcher fetcher fetcher fetcher fetcher fetcher fetcher Fetcher is separated out mainly for the consideration of network IO. At present, fetcher is affected by the network and is relatively slow.
  • The parser. Parser can start multiple parsers (currently 2 by default), and users can pass the parserFullDocumentParserControl. It basically reads data from a queue and parses it into a BSON structure. After the Parser is parsed, data is written to executor’s queue bar by bar. The Parser thread stands alone mainly because parsing is CPU intensive.
  • The executor. Executor can also start multiple executors, currently four by default, and users can passFullDocumentConcurrencyControl. Executor pulls the data from the queue, aggregates the data (upper limit: 16MB, total: 1024), and writes the data to the destination mongodb.

    After data is written to all tables, tableSyncer exits.

The incremental synchronization

The overall incremental architecture is as follows:



The Fetcher thread is responsible for sensing shard changes in the stream, and the Manager is responsible for notifies the message or creates a new Dispatcher for processing the message, one shard for each Dispatcher. The Dispatcher pulls incremental data from the source end, parses, packages, and integrates the data using Batcher, and writes the data to MongoDB using Executor and updates checkpoint. In addition, if it is a breakpoint continuation, the Dispatcher will start pulling from the old checkpoint point instead of starting from scratch.

The use of DynamoShake

/ dynamo-shake-conf =dynamo-shake.conf. The parameters are specified in dynamo-shake.conf.

  • Id: Changes will affect the name of the destination library on MongoDB
  • Log. file: indicates the log file. If the log file is not configured, it will be printed to standard output
  • Log. level: indicates the log level. The default is recommended.
  • Log. buffer: Indicates whether to print the cache. The default is recommended.
  • System_profile: Prints the port number of the internal stack. The default is recommended.
  • Http_profile: not enabled
  • Sync_mode: synchronization mode. All: full + incremental, full: full, incr: incremental (currently not supported)
  • Source. access_KEY_id: dynamodb connection configuration parameter
  • Source. secret_access_key: dynamodb connection configuration parameter
  • Source. session_token: Dynamodb connection configuration parameter, none can be left blank
  • Source. region: dynamodb connection configuration parameter
  • Filter.collection. white: filters the whitelist and synchronizes only the specified table
  • Filter.collection. black: filters the blacklist without passing the specified table.
  • Qps. full: indicates the number of requests sent in the full phase
  • Qps.full. Batch_num: indicates the maximum number of items in a full request.
  • Qps. incr: Incremental rate limiting, how many requests are sent in 1 second
  • Qps.incr. batCH_num: Indicates the maximum number of items a request can contain.
  • Target. type: indicates the destination configuration. Currently, only mongodb is supported
  • Target. address: indicates the connection address of the destination mongodb.
  • Target.mongodb. type: indicates whether mongodb is replica or Sharding
  • Target.mongodb. exist: What action to perform if the target library co-named table exists. Drop: deletes the file, rename: renames the file, and leave blank: does not process the file.
  • Full. concurrency: specifies the number of full synchronization threads. Each thread corresponds to one table
  • Full. The document. The concurrency: the synchronization of concurrent number 1 in the table.
  • Document. parser: Number of Parser threads in one table
  • Full. Enable_index. Primary: Indicates whether to synchronize the primary key of dynamodb.
  • Full. enable_index. User: indicates whether to synchronize user – created indexes
  • Convert. type: indicates the mode of writing. Raw indicates raw writing, and change indicates writing after parsing type fields. Refer to the above documents.
  • Increase. Concurrency: An incremental synchronization concurrency parameter. The maximum number of shards to be captured at a time
  • Checkpoint. address = Storage address of checkPont. By default, this parameter is not consistent with the destination library.
  • Checkpoint. db = checkpoint Specifies the name of the db to be written to. The default value is $db-checkpoint.

DynamoFullCheck

DynamoFullCheck is a tool used to check whether DynamoDB and MongoDB data are consistent. DynamoFullCheck currently supports only full check and does not support incremental synchronization. In other words, the source and destination are inconsistent in incremental synchronization. DynamoFullCheck supports only one-way verification, that is, checking whether DynamoDB data is a subset of MongoDB. In addition, it also supports sampling validation, which only validates tables of interest. The verification is mainly divided into the following parts:

  • Profile check. First, check whether the numbers in the tables on both sides are consistent; Next, check whether the index is consistent (currently no index check is done). Note that if the number in the table is inconsistent, it will exit directly without further validation.
  • Accurate calibration. To verify data accurately, the principle is to pull data from the source end and parse it. If there is a unique index, find the doc of MongoDB according to the unique index and compare the consistency. If there is no unique index, a lookup (heavy) is performed in MongoDB based on the entire doc. Sampling principle:

When accurate validation is enabled, each DOC is sampled to determine if the current DOC needs sampling. The principle is relatively simple, for example, according to 30% sampling, then 0~100 to generate a random number, if it is 0~30 on the check, vice versa. DynamoFullCheck because pulling from source DynamoDB also requires fetch and parse, DynamoShake is reused in this part of the code to a certain extent. The difference is that the fetcher, Parser, and Executor threads within DynamoFullCheck all have a concurrency of 1.

Using parameter

The full-check parameter is slightly simpler and can be injected directly from the command line, for example: ./dynamo-full-check –sourceAccessKeyID=BUIASOISUJPYS5OP3P5Q – sourceSecretAccessKey = TwWV9reJCrZhHKSYfqtTaFHW0qRPvjXb3m8TYHMe sourceRegion = ap – east – 1 – t = “10.1.1.1:30441” –sample=300

Usage:
  dynamo-full-check.darwin [OPTIONS]
Application Options:
  -i, --id=                    target database collection name (default: dynamo-shake)
  -l, --logLevel=
  -s, --sourceAccessKeyID=     dynamodb source access key id
      --sourceSecretAccessKey= dynamodb source secret access key
      --sourceSessionToken=    dynamodb source session token
      --sourceRegion=          dynamodb source region
      --qpsFull=               qps of scan command, default is 10000
      --qpsFullBatchNum=       batch number in each scan command, default is 128
  -t, --targetAddress=         mongodb target address
  -d, --diffOutputFile=        diff output file name (default: dynamo-full-check-diff)
  -p, --parallel=              how many threads used to compare, default is 16 (default: 16)
  -e, --sample=                comparison sample number for each table, 0 means disable (default: 1000)
      --filterCollectionWhite= only compare the given tables, split by '; '
      --filterCollectionBlack= do not compare the given tables, split by '; '
  -c, --convertType=           convert type (default: raw)
  -v, --version                print version
Help Options:
  -h, --help                   Show this help messageCopy the code

other

Do we open source DynamoShake, HMM… That’s up in the air, so stay tuned.


Author: Zhu Zhao

The original link

This article is the original content of the cloud habitat community, shall not be reproduced without permission.