Dapper Bulk Insert Returning Serial IDs

2019-03-18 17:29发布

问题:

I am attempting to perform a bulk-insert using Dapper over Npgsql, that returns the ids of the newly inserted rows. The following insert statement is used in both of my examples:

var query = "INSERT INTO \"MyTable\" (\"Value\") VALUES (@Value) RETURNING \"ID\"";

First, I attempted to add an array of objects with a "Value" property:

var values = new[] {
    new { Value = 0.0 },
    new { Value = 0.5 }
};
var ids = connection.Query<int>(query, values);

However, that fails with the NpgsqlException: "ERROR: 42703: column "value" does not exist". After reading this question, I thought that perhaps I have to pass a DataTable object instead of an object array:

var dataTable = new DataTable();
dataTable.Columns.Add("Value", typeof(double));
dataTable.Rows.Add(0.0);
dataTable.Rows.Add(0.5);
var ids = connection.Query<int>(query, dataTable);

However, this fails with the exact same exception. How can I perform a bulk-insert and get the resulting serial ids out of Dapper over Npgsql?

I did note that the casing of the exception does not match the column name, but I am certain that I have quotes around the table and column names, so I'm not certain why it says "value" instead of "Value" in the exception. Just thought I would mention it in case it is related to the error somehow, as it is easy to overlook casing.

-- EDIT --

To clarify, this is the SQL to create the table

CREATE TABLE "MyTable" (
    "ID" SERIAL PRIMARY KEY,
    "Value" DOUBLE PRECISION NOT NULL
);

And using the variables "query" and "values" defined above, this is the code that is working on a per-row basis:

var ids = new List<int>();
foreach (var valueObj in values) {
    var queryParams = new DynamicParamaters();
    queryParams.Add("Value", valueObj.Value);
    ids.AddRange(connection.Query<int>(query, queryParams));
}

The issue is that I need to be able to insert hundreds (perhaps thousands in the near future) of rows per second into "MyTable", so waiting for this loop to iteratively send each value to the database is cumbersome and (I assume, but have yet to benchmark) time consuming. Further, I perform additional computation on the values that may or may not result in additional inserts where I need a foreign key reference to the "MyTable" entry.

Because of these issues, I am looking for an alternative that sends all values in a single statement to the database, in order to reduce network traffic and processing latency. Again, I have NOT benchmarked the iterative approach yet... what I am looking for is an alternative that does a bulk insert so I can benchmark the two approaches against each other.

回答1:

Ultimately, I came up with four different approaches to this problem. I generated 500 random values to insert into MyTable, and timed each of the four approaches (including starting and rolling back the transaction in which it was run). In my test, the database is located on localhost. However, the solution with the best performance also requires only one round trip to the database server, so the best solution I found should still beat the alternatives when deployed to a different server than the database.

Note that the variables connection and transaction are used in the following code, and are assumed to be valid Npgsql data objects. Also note that the notation Nx slower indicates an operation took an amount of time equal to the optimal solution multiplied by N.

Approach #1 (1,494ms = 18.7x slower): Unroll the array into individual parameters

public List<MyTable> InsertEntries(double[] entries)
{
    // Create a variable used to dynamically build the query
    var query = new StringBuilder(
        "INSERT INTO \"MyTable\" (\"Value\") VALUES ");

    // Create the dictionary used to store the query parameters
    var queryParams = new DynamicParameters();

    // Get the result set without auto-assigned ids
    var result = entries.Select(e => new MyTable { Value = e }).ToList();

    // Add a unique parameter for each id
    var paramIdx = 0;
    foreach (var entry in result)
    {
        var paramName = string.Format("value{1:D6}", paramIdx);
        if (0 < paramIdx++) query.Append(',');
        query.AppendFormat("(:{0})", paramName);
        queryParams.Add(paramName, entry.Value);
    }
    query.Append(" RETURNING \"ID\"");

    // Execute the query, and store the ids
    var ids = connection.Query<int>(query, queryParams, transaction);
    ids.ForEach((id, i) => result[i].ID = id);

    // Return the result
    return result;
}

I'm really not sure why this came out to be the slowest, since it only requires a single round trip to the database, but it was.

Approach #2 (267ms = 3.3x slower): Standard loop iteration

public List<MyTable> InsertEntries(double[] entries)
{
    const string query =
        "INSERT INTO \"MyTable\" (\"Value\") VALUES (:val) RETURNING \"ID\"";

    // Get the result set without auto-assigned ids
    var result = entries.Select(e => new MyTable { Value = e }).ToList();

    // Add each entry to the database
    foreach (var entry in result)
    {
        var queryParams = new DynamicParameters();
        queryParams.Add("val", entry.Value);
        entry.ID = connection.Query<int>(
            query, queryParams, transaction);
    }

    // Return the result
    return result;
}

I was shocked that this was only 2x slower than the optimal solution, but I would expect that to get significantly worse in the real environment, since this solution requires sending 500 messages to the server serially. However, this is also the simplest solution.

Approach #3 (223ms = 2.8x slower): Asynchronous loop iteration

public List<MyTable> InsertEntries(double[] entries)
{
    const string query =
        "INSERT INTO \"MyTable\" (\"Value\") VALUES (:val) RETURNING \"ID\"";

    // Get the result set without auto-assigned ids
    var result = entries.Select(e => new MyTable { Value = e }).ToList();

    // Add each entry to the database asynchronously
    var taskList = new List<Task<IEnumerable<int>>>();
    foreach (var entry in result)
    {
        var queryParams = new DynamicParameters();
        queryParams.Add("val", entry.Value);
        taskList.Add(connection.QueryAsync<int>(
            query, queryParams, transaction));
    }

    // Now that all queries have been sent, start reading the results
    for (var i = 0; i < result.Count; ++i)
    {
        result[i].ID = taskList[i].Result.First();
    }

    // Return the result
    return result;
}

This is getting better, but is still less than optimal because we can only queue as many inserts as there are available threads in the thread pool. However, this is almost as simple as the non-threaded approach, so it is a good compromise between speed and readability.

Approach #4 (134ms = 1.7x slower): Bulk inserts

This approach requires the following Postgres SQL be defined prior to running the code segment below it:

CREATE TYPE "MyTableType" AS (
    "Value" DOUBLE PRECISION
);

CREATE FUNCTION "InsertIntoMyTable"(entries "MyTableType"[])
    RETURNS SETOF INT AS $$

    DECLARE
        insertCmd TEXT := 'INSERT INTO "MyTable" ("Value") '
            'VALUES ($1) RETURNING "ID"';
        entry "MyTableType";
    BEGIN
        FOREACH entry IN ARRAY entries LOOP
            RETURN QUERY EXECUTE insertCmd USING entry."Value";
        END LOOP;
    END;
$$ LANGUAGE PLPGSQL;

And the associated code:

public List<MyTable> InsertEntries(double[] entries)
{
    const string query =
        "SELECT * FROM \"InsertIntoMyTable\"(:entries::\"MyTableType\")";

    // Get the result set without auto-assigned ids
    var result = entries.Select(e => new MyTable { Value = e }).ToList();

    // Convert each entry into a Postgres string
    var entryStrings = result.Select(
        e => string.Format("({0:E16})", e.Value).ToArray();

    // Create a parameter for the array of MyTable entries
    var queryParam = new {entries = entryStrings};

    // Perform the insert
    var ids = connection.Query<int>(query, queryParam, transaction);

    // Assign each id to the result
    ids.ForEach((id, i) => result[i].ID = id);

    // Return the result
    return result;
}

There are two issues that I have with this approach. The first is that I have to hard-code the ordering of the members of MyTableType. If that ordering ever changes, I have to modify this code to match. The second is that I have to convert all input values to a string prior to sending them to postgres (in the real code, I have more than one column, so I can't just change the signature of the database function to take a double precision[], unless I pass in N arrays, where N is the number of fields on MyTableType).

Despite these pitfalls, this is getting closer to ideal, and only requires one round-trip to the database.

-- BEGIN EDIT --

Since the original post, I came up with four additional approaches that are all faster than those listed above. I have modified the Nx slower numbers to reflect the new fastest method, below.

Approach #5 (105ms = 1.3x slower): Same as #4, without a dynamic query

The only difference between this approach and Approach #4 is the following change to the "InsertIntoMyTable" function:

CREATE FUNCTION "InsertIntoMyTable"(entries "MyTableType"[])
    RETURNS SETOF INT AS $$

    DECLARE
        entry "MyTableType";
    BEGIN
        FOREACH entry IN ARRAY entries LOOP
            RETURN QUERY INSERT INTO "MyTable" ("Value")
                VALUES (entry."Value") RETURNING "ID";
        END LOOP;
    END;
$$ LANGUAGE PLPGSQL;

In addition to the issues with Approach #4, the downside to this is that, in the production environment, "MyTable" is partitioned. Using this approach, I need one method per target partition.

Approach #6 (89ms = 1.1x slower): Insert statement with array argument

public List<MyTable> InsertEntries(double[] entries)
{
    const string query =
        "INSERT INTO \"MyTable\" (\"Value\") SELECT a.* FROM " +
            "UNNEST(:entries::\"MyTableType\") a RETURNING \"ID\"";

    // Get the result set without auto-assigned ids
    var result = entries.Select(e => new MyTable { Value = e }).ToList();

    // Convert each entry into a Postgres string
    var entryStrings = result.Select(
        e => string.Format("({0:E16})", e.Value).ToArray();

    // Create a parameter for the array of MyTable entries
    var queryParam = new {entries = entryStrings};

    // Perform the insert
    var ids = connection.Query<int>(query, queryParam, transaction);

    // Assign each id to the result
    ids.ForEach((id, i) => result[i].ID = id);

    // Return the result
    return result;
}

The only downside to this is the same as the first issue with Approach #4. Namely, that it couples the implementation to the ordering of "MyTableType". Still, I found this to be my second favorite approach since it is very fast, and does not require any database functions to work correctly.

Approach #7 (80ms = VERY slightly slower): Same as #1, but without parameters

public List<MyTable> InsertEntries(double[] entries)
{
    // Create a variable used to dynamically build the query
    var query = new StringBuilder(
        "INSERT INTO \"MyTable\" (\"Value\") VALUES");

    // Get the result set without auto-assigned ids
    var result = entries.Select(e => new MyTable { Value = e }).ToList();

    // Add each row directly into the insert statement
    for (var i = 0; i < result.Count; ++i)
    {
        entry = result[i];
        query.Append(i == 0 ? ' ' : ',');
        query.AppendFormat("({0:E16})", entry.Value);
    }
    query.Append(" RETURNING \"ID\"");

    // Execute the query, and store the ids
    var ids = connection.Query<int>(query, null, transaction);
    ids.ForEach((id, i) => result[i].ID = id);

    // Return the result
    return result;
}

This is my favorite approach. It is only marginally slower than the fastest (even with 4000 records, it still runs under 1 second), but requires no special database functions or types. The only thing I don't like about it is that I have to stringify the double precision values, only to be parsed out again by Postgres. It would be preferable to send the values in binary so they took up 8 bytes instead of the massive 20 or so bytes I have allocated for them.

Approach #8 (80ms): Same as #5, but in pure sql

The only difference between this approach and Approach #5 is the following change to the "InsertIntoMyTable" function:

CREATE FUNCTION "InsertIntoMyTable"(
    entries "MyTableType"[]) RETURNS SETOF INT AS $$

    INSERT INTO "MyTable" ("Value")
        SELECT a.* FROM UNNEST(entries) a RETURNING "ID";
$$ LANGUAGE SQL;

This approach, like #5, requires one function per "MyTable" partition. This is the fastest because the query plan can be generated once for each function, then reused. In the other approaches, the query must be parsed, then planned, then executed. Despite this being the fastest, I didn't choose it due to the additional requirements on the database side over Approach #7, with very little speed benefit.