Bulk insert strategy from c# to SQL Server

2019-04-07 21:16发布

问题:

In our current project, customers will send collection of a complex/nested messages to our system. Frequency of these messages are approx. 1000-2000 msg/per seconds.

These complex objects contains the transaction data (to be added) as well as master data (which will be added if not found). But instead of passing the ids of the master data, customer passes the 'name' column.

System checks if master data exist for these names. If found, it uses the ids from database otherwise create this master data first and then use these ids.

Once master data ids are resolved, system inserts the transactional data to a SQL Server database (using master data ids). Number of master entities per message are around 15-20.

Following are the some strategies we can adopt.

  1. We can resolve master ids first from our C# code (and insert master data if not found) and store these ids in C# cache. Once all ids are resolved, we can bulk insert the transactional data using SqlBulkCopy class. We can hit the database 15 times to fetch the ids for different entities and then hit database one more time to insert the final data. We can use the same connection will close it after doing all this processing.

  2. We can send all these messages containing master data and transactional data in single hit to the database (in the form of multiple TVP) and then inside stored procedure, create the master data first for the missing ones and then insert the transactional data.

Could anyone suggest the best approach in this use case?

Due to some privacy issue, I cannot share the actual object structure. But here is the hypothetical object structure which is very close to our business object.

One such message will contain information about one product (its master data) and its price details (transaction data) from different vendors:

Master data (which need to be added if not found)

Product name: ABC, ProductCateory: XYZ, Manufacturer: XXX and some other other details (number of properties are in the range of 15-20).

Transaction data (which will always be added)

Vendor Name: A, ListPrice: XXX, Discount: XXX

Vendor Name: B, ListPrice: XXX, Discount: XXX

Vendor Name: C, ListPrice: XXX, Discount: XXX

Vendor Name: D, ListPrice: XXX, Discount: XXX

Most of the information about the master data will remain the same for a message belong to one product (and will change less frequently) but transaction data will always fluctuate. So, system will check if the product 'XXX' exist in the system or not. If not it check if the 'Category' mentioned with this product exist of not. If not, it will insert a new record for category and then for product. This will be done to for Manufacturer and other master data.

Multiple vendors will be sending data about multiple products (2000-5000) at the same time.

So, assume that we have 1000 suppliers, Each vendor is sending data about 10-15 different products. After each 2-3 seconds, every vendor sends us the price updates of these 10 products. He may start sending data about new products, but which will not be very frequent.

回答1:

You would likely be best off with your #2 idea (i.e. sending all of the 15 - 20 entities to the DB in one shot using multiple TVPs and processing as a whole set of up to 2000 messages).

Caching master data lookups at the app layer and translating prior to sending to the DB sounds great, but misses something:

  1. You are going to have to hit the DB to get the initial list anyway
  2. You are going to have to hit the DB to insert new entries anyway
  3. Looking up values in a dictionary to replace with IDs is exactly what a database does (assume a Non-Clustered Index on each of these name-to-ID lookups)
  4. Frequently queried values will have their datapages cached in the buffer pool (which is a memory cache)

Why duplicate at the app layer what is already provided and happening right now at the DB layer, especially given:

  • The 15 - 20 entities can have up to 20k records (which is a relatively small number, especially when considering that the Non-Clustered Index only needs to be two fields: Name and ID which can pack many rows into a single data page when using a 100% Fill Factor).
  • Not all 20k entries are "active" or "current", so you don't need to worry about caching all of them. So whatever values are current will be easily identified as the ones being queried, and those data pages (which may include some inactive entries, but no big deal there) will be the ones to get cached in the Buffer Pool.

Hence, you don't need to worry about aging out old entries OR forcing any key expirations or reloads due to possibly changing values (i.e. updated Name for a particular ID) as that is handled naturally.

Yes, in-memory caching is wonderful technology and greatly speeds up websites, but those scenarios / use-cases are for when non-database processes are requesting the same data over and over in pure read-only purposes. But this particular scenario is one in which data is being merged and the list of lookup values can be changing frequently (moreso due to new entries than due to updated entries).


That all being said, Option #2 is the way to go. I have done this technique several times with much success, though not with 15 TVPs. It might be that some optimizations / adjustments need to be made to the method to tune this particular situation, but what I have found to work well is:

  • Accept the data via TVP. I prefer this over SqlBulkCopy because:
    • it makes for an easily self-contained Stored Procedure
    • it fits very nicely into the app code to fully stream the collection(s) to the DB without needing to copy the collection(s) to a DataTable first, which is duplicating the collection, which is wasting CPU and memory. This requires that you create a method per each collection that returns IEnumerable<SqlDataRecord>, accepts the collection as input, and uses yield return; to send each record in the for or foreach loop.
  • TVPs are not great for statistics and hence not great for JOINing to (though this can be mitigated by using a TOP (@RecordCount) in the queries), but you don't need to worry about that anyway since they are only used to populate the real tables with any missing values
  • Step 1: Insert missing Names for each entity. Remember that there should be a NonClustered Index on the [Name] field for each entity, and assuming that the ID is the Clustered Index, that value will naturally be a part of the index, hence [Name] only will provide a covering index in addition to helping the following operation. And also remember that any prior executions for this client (i.e. roughly the same entity values) will cause the data pages for these indexes to remain cached in the Buffer Pool (i.e. memory).

    ;WITH cte AS
    (
      SELECT DISTINCT tmp.[Name]
      FROM   @EntityNumeroUno tmp
    )
    INSERT INTO EntityNumeroUno ([Name])
      SELECT cte.[Name]
      FROM   cte
      WHERE  NOT EXISTS(
                     SELECT *
                     FROM   EntityNumeroUno tab
                     WHERE  tab.[Name] = cte.[Name]
                       )
    
  • Step 2: INSERT all of the "messages" in simple INSERT...SELECT where the data pages for the lookup tables (i.e. the "entities") are already cached in the Buffer Pool due to Step 1


Finally, keep in mind that conjecture / assumptions / educated guesses are no substitute for testing. You need to try a few methods to see what works best for your particular situation since there might be additional details that have not been shared that could influence what is considered "ideal" here.

I will say that if the Messages are insert-only, then Vlad's idea might be faster. The method I am describing here I have used in situations that were more complex and required full syncing (updates and deletes) and did additional validations and creation of related operational data (not lookup values). Using SqlBulkCopy might be faster on straight inserts (though for only 2000 records I doubt there is much difference if any at all), but this assumes you are loading directly to the destination tables (messages and lookups) and not into intermediary / staging tables (and I believe Vlad's idea is to SqlBulkCopy directly to the destination tables). However, as stated above, using an external cache (i.e. not the Buffer Pool) is also more error prone due to the issue of updating lookup values. It could take more code than it's worth to account for invalidating an external cache, especially if using an external cache is only marginally faster. That additional risk / maintenance needs to be factored into which method is overall better for your needs.


UPDATE

Based on info provided in comments, we now know:

  • There are multiple Vendors
  • There are multiple Products offered by each Vendor
  • Products are not unique to a Vendor; Products are sold by 1 or more Vendors
  • Product properties are singular
  • Pricing info has properties that can have multiple records
  • Pricing info is INSERT-only (i.e. point-in-time history)
  • Unique Product is determined by SKU (or similar field)
  • Once created, a Product coming through with an existing SKU but different properties otherwise (e.g. category, manufacturer, etc) will be considered the same Product; the differences will be ignored

With all of this in mind, I will still recommend TVPs, but to re-think the approach and make it Vendor-centric, not Product-centric. The assumption here is that Vendor's send files whenever. So when you get a file, import it. The only lookup you would be doing ahead of time is the Vendor. Here is the basic layout:

  1. Seems reasonable to assume that you already have a VendorID at this point because why would the system be importing a file from an unknown source?
  2. You can import in batches
  3. Create a SendRows method that:
    • accepts a FileStream or something that allows for advancing through a file
    • accepts something like int BatchSize
    • returns IEnumerable<SqlDataRecord>
    • creates a SqlDataRecord to match the TVP structure
    • for loops though the FileStream until either BatchSize has been met or no more records in the File
    • perform any necessary validations on the data
    • map the data to the SqlDataRecord
    • call yield return;
  4. Open the file
  5. While there is data in the file
    • call the stored proc
    • pass in VendorID
    • pass in SendRows(FileStream, BatchSize) for the TVP
  6. Close the file
  7. Experiment with:
    • opening the SqlConnection before the loop around the FileStream and closing it after the loops are done
    • Opening the SqlConnection, executing the stored procedure, and closing the SqlConnection inside of the FileStream loop
  8. Experiment with various BatchSize values. Start at 100, then 200, 500, etc.
  9. The stored proc will handle inserting new Products

Using this type of structure you will be sending in Product properties that are not used (i.e. only the SKU is used for the look up of existing Products). BUT, it scales very well as there is no upper-bound regarding file size. If the Vendor sends 50 Products, fine. If they send 50k Products, fine. If they send 4 million Products (which is the system I worked on and it did handle updating Product info that was different for any of its properties!), then fine. No increase in memory at the app layer or DB layer to handle even 10 million Products. The time the import takes should increase in step with the amount of Products sent.


UPDATE 2
New details related to Source data:

  • comes from Azure EventHub
  • comes in the form of C# objects (no files)
  • Product details come in through O.P.'s system's APIs
  • is collected in single queue (just pull data out insert into database)

If the data source is C# objects then I would most definitely use TVPs as you can send them over as is via the method I described in my first update (i.e. a method that returns IEnumerable<SqlDataRecord>). Send one or more TVPs for the Price/Offer per Vendor details but regular input params for the singular Property attributes. For example:

CREATE PROCEDURE dbo.ImportProduct
(
  @SKU             VARCHAR(50),
  @ProductName     NVARCHAR(100),
  @Manufacturer    NVARCHAR(100),
  @Category        NVARCHAR(300),
  @VendorPrices    dbo.VendorPrices READONLY,
  @DiscountCoupons dbo.DiscountCoupons READONLY
)
SET NOCOUNT ON;

-- Insert Product if it doesn't already exist
IF (NOT EXISTS(
         SELECT  *
         FROM    dbo.Products pr
         WHERE   pr.SKU = @SKU
              )
   )
BEGIN
  INSERT INTO dbo.Products (SKU, ProductName, Manufacturer, Category, ...)
  VALUES (@SKU, @ProductName, @Manufacturer, @Category, ...);
END;

...INSERT data from TVPs
-- might need OPTION (RECOMPILE) per each TVP query to ensure proper estimated rows


回答2:

From a DB point of view, there's no such fast thing than BULK INSERT (from csv files for example). The best is to bulk all data asap, then process it with stored procedures.

A C# layer will just slow down the process, since all the queries between C# and SQL will be thousands times slower than what Sql-Server can directly handle.