In the First part of the article, we created the Azure Cosmos SQL API with test data and later used the .NET Console application using the Azure Cosmos SDK to perform some basic CRUD operations.
In this article we will see how we can migrate data from Azure Cosmos SQL API to Atlas Cluster by leveraging some tools from Azure and Mongo.
Pre-requisite
Mongo Atlas ( Azure )
- Create an Atlas account.
- Create an Organisation with one project.
- Create an Atlas Cluster( Peering option is available only on M10 and above)
- Create a DB User with access to any database
Data Export
The Current data in Azure has one collection in cosmicworks database as shown below
Use datamigrationtool to export data from Azure Cosmos DB SQL API as JSON files. Download the binaries and configure the source and target information.
The Data Migration tool is an open-source solution that imports/exports data to/from Azure Cosmos DB. While the tool includes a graphical user interface (dtui.exe), it can also be driven from the command-line (dt.exe).
The Azure Cosmos DB JSON exporter allows you to export any of the available source options to a JSON file that has an array of JSON documents. The tool handles the export for you. Alternatively, you can choose to view the resulting migration command and run the command yourself. The resulting JSON file may be stored locally or in Azure Blob storage.
The primary connection string, along with the database name, is provided as the source information.
The local directory is provided as the target information
The products collection has successfully exported 295 documents.
Data Import
Use the mongoimport command to import the data exported from Azure Cosmos to Atlas
mongoimport --uri mongodb+srv://<USERNAME>:<PASSWORD>@<hostname>/<DATABASE> --collection <COLLECTION> --type json --file <FILENAME>
Once the command is executed successfully, the documents will be restored to Atlas. The mongoimport will show the total documents restored.
Alternatively, we can use the MongoDB Compass to import the documents, as shown below.
Atlas Portal will now show the restored collection.
Initial Snapshot
Using the combination of tools, we can complete the initial snapshot as depicted in the diagram below. But what happens to documents that are updated or newly inserted during migration? What if the client decides to have the ongoing changes replicated to Atlas for a few days before he does the application migration. Can we have the ongoing data available in Atlas? Yeah, with the CDC, it should be possible.
Change Data Capture(CDC), is one of the common architecture patterns used for detecting changes as they are happening. This can be used for variety of use cases ranging from Cache Invalidation, ETL, Stream processing, Data Replication etc.
Change Data Capture
We can listen to the ongoing changes in CosmosDB using the Source Connector provided by Azure and push the changes as plain JSON to a Kafka topic. Optionally, if any transformation is required, we can add KSQL apps to the transformation and push it to the final topic.
Later leverage the Atlas Sink Connector by Mongo for pushing the ongoing message to the Atlas Cluster.
Below is the diagram depicting the flow of change stream messages from Cosmos SQL to Mongo.
Pre-requisite
Confluent Setup
- Set up a Confluent Cloud Platform Or Confluent Platform using Docker
- Create a Kafka topic using Confluent Control Center. For this scenario, we’ll create a Kafka topic named “topics_1” and write non-schema embedded JSON data to the topic.
- Atlas Cluster with the Initial Snapshot data.
CosmosDB Source Connector Setup
Kafka Connect for Azure Cosmos DB is a connector to read from and write data to Azure Cosmos DB. The Azure Cosmos DB source connector provides the capability to read data from the Azure Cosmos DB change feed and publish this data to a Kafka topic.
Configure the CosmosDB SQL API instance with the required Endpoint, Connection Key, and Database name.
Configure the Topic-Container map as <topicName>#<collectionName> e.g. topics_1#products. In case if you wanted to specify multiple collections follow the same format and delimit with comma.
{
“name”: “cosmosdb-source-connector”,
“config”: {
“connector.class”: “com.azure.cosmos.kafka.connect.source.CosmosDBSourceConnector”,
“tasks.max”: “1”,
“key.converter”: “org.apache.kafka.connect.json.JsonConverter”,
“value.converter”: “org.apache.kafka.connect.json.JsonConverter”,
“connect.cosmos.task.poll.interval”: “100”,
“connect.cosmos.connection.endpoint”: “https://<cosmosinstance-name>.documents.azure.com:443/”,
“connect.cosmos.master.key”: “<cosmosdbprimarykey>”,
“connect.cosmos.databasename”: “kafkaconnect”,
“connect.cosmos.containers.topicmap”: “topics_1#products”,
“connect.cosmos.offset.useLatest”: false,
“value.converter.schemas.enable”: “false”,
“key.converter.schemas.enable”: “false”
}
}
Alternatively, you can create the Azure Cosmos DB source connector in Kafka Connect, using the above-mentioned JSON configuration. Make sure to replace the placeholder values for connect.cosmos.connection.endpoint
, connect.cosmos.master.key
properties that you should have saved from the Azure Cosmos DB setup
MongoDB Atlas Sink Connector Setup
Kafka Connect for MongoDB Atlas is a connector to read from and write data to MongoDB Atlas. The MongoDB Atlas sink connector allows you to export data from Apache Kafka topics to an Atlas database. The connector polls data from Kafka to write to collection in the database based on the topics’ subscriptions.
Configure the topic to subscribe for the Sink Connector as shown below
Configure the host, database and collection for the topic to sink to.
Test Change Stream
Modify a document in Azure Cosmos DB, the source connector listens to the changes and pushes the message to the Kafka topic as shown below
The modified message is pushed to the Kafka topic as shown below
Later the message is sinked to the Atlas cluster from topic as shown below.
Overall Design
- Complete the Initial Snapshotting of data using Azure datamigration and Mongo’s mongoimport tools.
- Capture any changes during Initial snapshotting and other changes until the application switch over using Kakfa Connectors from Azure(CosmosDB Source Connector) and MongoDB(Atlas Sink Connector).
In the next article we will discuss some of the aspects involved in application migration when migrating from CosmosDB SQL API to Atlas.