Azure documentdb bulk insert using stored procedur

2020-02-28 09:58发布

问题:

Hi I am using 16 collections to insert around 3-4 million json objects ranging from 5-10k per object.I am using stored procedure to insert these documents.I have 22 Capacity Unit.

function bulkImport(docs) {
    var collection = getContext().getCollection();
    var collectionLink = collection.getSelfLink();

    // The count of imported docs, also used as current doc index.
    var count = 0;

    // Validate input.
    if (!docs) throw new Error("The array is undefined or null.");

    var docsLength = docs.length;
    if (docsLength == 0) {
        getContext().getResponse().setBody(0);
    }

    // Call the CRUD API to create a document.
    tryCreateOrUpdate(docs[count], callback);

    // Note that there are 2 exit conditions:
    // 1) The createDocument request was not accepted. 
    //    In this case the callback will not be called, we just call setBody and we are done.
    // 2) The callback was called docs.length times.
    //    In this case all documents were created and we don't need to call tryCreate anymore. Just call setBody and we are done.
    function tryCreateOrUpdate(doc, callback) {
        var isAccepted = true;
        var isFound = collection.queryDocuments(collectionLink, 'SELECT * FROM root r WHERE r.id = "' + doc.id + '"', function (err, feed, options) {
            if (err) throw err;
            if (!feed || !feed.length) {
                isAccepted = collection.createDocument(collectionLink, doc, callback);
            }
            else {
                // The metadata document.
                var existingDoc = feed[0];
                isAccepted = collection.replaceDocument(existingDoc._self, doc, callback);
            }
        });

        // If the request was accepted, callback will be called.
        // Otherwise report current count back to the client, 
        // which will call the script again with remaining set of docs.
        // This condition will happen when this stored procedure has been running too long
        // and is about to get cancelled by the server. This will allow the calling client
        // to resume this batch from the point we got to before isAccepted was set to false
        if (!isFound && !isAccepted) getContext().getResponse().setBody(count);
    }

    // This is called when collection.createDocument is done and the document has been persisted.
    function callback(err, doc, options) {
        if (err) throw err;

        // One more document has been inserted, increment the count.
        count++;

        if (count >= docsLength) {
            // If we have created all documents, we are done. Just set the response.
            getContext().getResponse().setBody(count);
        } else {
            // Create next document.
            tryCreateOrUpdate(docs[count], callback);
        }
    }

my C# codes looks like this

    public async Task<int> Add(List<JobDTO> entities)
            {

                    int currentCount = 0;
                    int documentCount = entities.Count;

                    while(currentCount < documentCount)
                    {
                        string argsJson = JsonConvert.SerializeObject(entities.Skip(currentCount).ToArray());
                        var args = new dynamic[] { JsonConvert.DeserializeObject<dynamic[]>(argsJson) };

                        // 6. execute the batch.
                        StoredProcedureResponse<int> scriptResult = await DocumentDBRepository.Client.ExecuteStoredProcedureAsync<int>(sproc.SelfLink, args);

                        // 7. Prepare for next batch.
                        int currentlyInserted = scriptResult.Response;

                        currentCount += currentlyInserted;

                    }

                    return currentCount;
            }

The problem I am facing is out of 400k documents that I try to insert at times documents get missed with out giving any error.

The application is worker role deployed on cloud. If I increase the number of threads or instances inserting in documentDB the number of documents missed are much higher.

how to figure out what is the problem.Thanks in Advance.

回答1:

I found that when trying this code I would get an error at docs.length which stated that length was undefined.

function bulkImport(docs) {
    var collection = getContext().getCollection();
    var collectionLink = collection.getSelfLink();

    // The count of imported docs, also used as current doc index.
    var count = 0;

    // Validate input.
    if (!docs) throw new Error("The array is undefined or null.");

    var docsLength = docs.length; // length is undefined
}

After many tests (could not find anything in Azure documentation) I realized that I could not pass an array as was suggested. The parameter had to be an object. I had to modify the batch code like this in order for it to run.

I also found I could not simply try and pass an array of documents in the DocumentDB script explorer (Input box) either. Even though the placeholder help text says you can.

This code worked for me:

// psuedo object for reference only
docObject = {
  "items": [{doc}, {doc}, {doc}]
}

function bulkImport(docObject) {
    var context = getContext();
    var collection = context.getCollection();
    var collectionLink = collection.getSelfLink();
    var count = 0;

    // Check input
    if (!docObject.items || !docObject.items.length) throw new Error("invalid document input parameter or undefined.");
    var docs = docObject.items;
    var docsLength = docs.length;
    if (docsLength == 0) {
        context.getResponse().setBody(0);
    }

    // Call the funct to create a document.
    tryCreateOrUpdate(docs[count], callback);

    // Obviously I have truncated this function. The above code should help you understand what has to change.
}

Hopefully Azure documentation will catch up or become easier to find if I missed it.

I'll also be placing a bug report for the Script Explorer in hopes that the Azurites will update.



回答2:

It’s important to note that stored procedures have bounded execution, in which all operations must complete within the server specified request timeout duration. If an operation does not complete with that time limit, the transaction is automatically rolled back. In order to simplify development to handle time limits, all CRUD (Create, Read, Update, and Delete) operations return a Boolean value that represents whether that operation will complete. This Boolean value can be used a signal to wrap up execution and for implementing a continuation based model to resume execution (this is illustrated in our code samples below).

The bulk-insert stored procedure provided above implements the continuation model by returning the number of documents successfully created. This is noted in the stored procedure's comments:

    // If the request was accepted, callback will be called.
    // Otherwise report current count back to the client, 
    // which will call the script again with remaining set of docs.
    // This condition will happen when this stored procedure has been running too long
    // and is about to get cancelled by the server. This will allow the calling client
    // to resume this batch from the point we got to before isAccepted was set to false
    if (!isFound && !isAccepted) getContext().getResponse().setBody(count);

If the output document count is less than the input document count, you will need to re-run the stored procedure with the remaining set of documents.



回答3:

Since May 2018 there is a new Batch SDK for Cosmos DB. There is a GitHub repo to get you started.

I have been able to import 100.000 records in 9 seconds. And using Azure Batch to fan out the inserts, I have done 19 mln records in 1m15s. This was on a 1.66mln RU/s collection, which you obviously can scale down after import.