Insert Data into MySQL in multiple Tables in C# ef

2019-04-04 01:09发布

I need to insert a huge CSV-File into 2 Tables with a 1:n relationship within a mySQL Database.

The CSV-file comes weekly and has about 1GB, which needs to be append to the existing data. Each of them 2 tables have a Auto increment Primary Key.

I've tried:

  • Entity Framework (takes most time of all approaches)
  • Datasets (same)
  • Bulk Upload (doesn't support multiple tables)
  • MySqlCommand with Parameters (needs to be nested, my current approach)
  • MySqlCommand with StoredProcedure including a Transaction

Any further suggestions?

Let's say simplified this is my datastructure:

public class User
{
    public string FirstName { get; set; }
    public string LastName { get; set; }
    public List<string> Codes { get; set; }
}

I need to insert from the csv into this database:

       User   (1-n)   Code     
+---+-----+-----+ +---+---+-----+        
|PID|FName|LName| |CID|PID|Code | 
+---+-----+-----+ +---+---+-----+
| 1 |Jon  | Foo | | 1 | 1 | ed3 | 
| 2 |Max  | Foo | | 2 | 1 | wst | 
| 3 |Paul | Foo | | 3 | 2 | xsd | 
+---+-----+-----+ +---+---+-----+ 

Here a sample line of the CSV-file

Jon;Foo;ed3,wst

A Bulk load like LOAD DATA LOCAL INFILE is not possible because i have restricted writing rights

标签: c# mysql insert
7条回答
成全新的幸福
2楼-- · 2019-04-04 01:39

AFAIK the insertions done in a table are sequential while the insertions in different table can be done in parallel. Open two separate new connections to the same database and then insert in parallel maybe by using Task Parallel Library.

However, if there are integrity constraints about 1:n relationship between the tables, then:

  1. Insertions might fail and thus any parallel insert approach would be wrong. Clearly then your best bet would be to do sequential inserts only, one table after the other.
  2. You can try and sort the data of both tables write the InsertInto method written below such that insert in second table will happen only after you are done inserting the data in the first one.

Edit: Since you have requested, if there is a possibility for you to perform the inserts in parallel, following is the code template you can use.

private void ParallelInserts()
{
    ..
    //Other code in the method
    ..

    //Read first csv into memory. It's just a GB so should be fine
    ReadFirstCSV();

    //Read second csv into memory...
    ReadSecondCSV();

    //Because the inserts will last more than a few CPU cycles...
    var taskFactory = new TaskFactory(TaskCreationOptions.LongRunning, TaskContinuationOptions.None)

    //An array to hold the two parallel inserts
    var insertTasks = new Task[2];

    //Begin insert into first table...
    insertTasks[0] = taskFactory.StartNew(() => InsertInto(commandStringFirst, connectionStringFirst));

    //Begin insert into second table...
    insertTasks[1] = taskFactory.StartNew(() => InsertInto(commandStringSecond, connectionStringSecond));

    //Let them be done...
    Task.WaitAll(insertTasks);

    Console.WriteLine("Parallel insert finished.");
}


//Defining the InsertInto method which we are passing to the tasks in the method above
private static void InsertInto(string commandString, string connectionString)
{
    using (/*open a new connection using the connectionString passed*/)
    {
        //In a while loop, iterate until you have 100/200/500 rows
        while (fileIsNotExhausted)
        {
            using (/*commandString*/)
            {
                //Execute command to insert in bulk
            }
        }
    }
}
查看更多
混吃等死
3楼-- · 2019-04-04 01:43

Given the great size of data, the best approach (performance wise) is to leave as much data processing to the database and not the application.

Create a temporary table that the data from the .csv file will be temporarily saved.

CREATE TABLE `imported` (
    `id` int(11) NOT NULL,
    `firstname` varchar(45) DEFAULT NULL,
    `lastname` varchar(45) DEFAULT NULL,
    `codes` varchar(450) DEFAULT NULL,
    PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

Loading the data from the .csv to this table is pretty straightforward. I would suggest the use of MySqlCommand (which is also your current approach). Also, using the same MySqlConnection object for all INSERT statements will reduce the total execution time.

Then to furthermore process the data, you can create a stored procedure that will handle it.

Assuming these two tables (taken from your simplified example):

CREATE TABLE `users` (
  `PID` int(11) NOT NULL AUTO_INCREMENT,
  `FName` varchar(45) DEFAULT NULL,
  `LName` varchar(45) DEFAULT NULL,
  PRIMARY KEY (`PID`)
) ENGINE=InnoDB AUTO_INCREMENT=3737 DEFAULT CHARSET=utf8;

and

CREATE TABLE `codes` (
  `CID` int(11) NOT NULL AUTO_INCREMENT,
  `PID` int(11) DEFAULT NULL,
  `code` varchar(45) DEFAULT NULL,
  PRIMARY KEY (`CID`)
) ENGINE=InnoDB AUTO_INCREMENT=15 DEFAULT CHARSET=utf8;

you can have the following stored procedure.

CREATE DEFINER=`root`@`localhost` PROCEDURE `import_data`()
BEGIN
    DECLARE fname VARCHAR(255);
    DECLARE lname VARCHAR(255);
    DECLARE codesstr VARCHAR(255);
    DECLARE splitted_value VARCHAR(255);
    DECLARE done INT DEFAULT 0;
    DECLARE newid INT DEFAULT 0;
    DECLARE occurance INT DEFAULT 0;
    DECLARE i INT DEFAULT 0;

    DECLARE cur CURSOR FOR SELECT firstname,lastname,codes FROM imported;
    DECLARE CONTINUE HANDLER FOR NOT FOUND SET done = 1;

    OPEN cur;

    import_loop: 
        LOOP FETCH cur INTO fname, lname, codesstr;
            IF done = 1 THEN
                LEAVE import_loop;
            END IF;

            INSERT INTO users (FName,LName) VALUES (fname, lname);
            SET newid = LAST_INSERT_ID();

            SET i=1;
            SET occurance = (SELECT LENGTH(codesstr) - LENGTH(REPLACE(codesstr, ',', '')) + 1);

            WHILE i <= occurance DO
                SET splitted_value =
                    (SELECT REPLACE(SUBSTRING(SUBSTRING_INDEX(codesstr, ',', i),
                    LENGTH(SUBSTRING_INDEX(codesstr, ',', i - 1)) + 1), ',', ''));

                INSERT INTO codes (PID, code) VALUES (newid, splitted_value);
                SET i = i + 1;
            END WHILE;
        END LOOP;
    CLOSE cur;
END

For every row in the source data, it makes an INSERT statement for the user table. Then there is a WHILE loop to split the comma separated codes and make for each one an INSERT statement for the codes table.

Regarding the use of LAST_INSERT_ID(), it is reliable on a PER CONNECTION basis (see doc here). If the MySQL connection used to run this stored procedure is not used by other transactions, the use of LAST_INSERT_ID() is safe.

The ID that was generated is maintained in the server on a per-connection basis. This means that the value returned by the function to a given client is the first AUTO_INCREMENT value generated for most recent statement affecting an AUTO_INCREMENT column by that client. This value cannot be affected by other clients, even if they generate AUTO_INCREMENT values of their own. This behavior ensures that each client can retrieve its own ID without concern for the activity of other clients, and without the need for locks or transactions.

Edit: Here is the OP's variant that omits the temp-table imported. Instead of inserting the data from the .csv to the imported table, you call the SP to directly store them to your database.

CREATE DEFINER=`root`@`localhost` PROCEDURE `import_data`(IN fname VARCHAR(255), IN lname VARCHAR(255),IN codesstr VARCHAR(255))
BEGIN
    DECLARE splitted_value VARCHAR(255);
    DECLARE done INT DEFAULT 0;
    DECLARE newid INT DEFAULT 0;
    DECLARE occurance INT DEFAULT 0;
    DECLARE i INT DEFAULT 0;

    INSERT INTO users (FName,LName) VALUES (fname, lname);
    SET newid = LAST_INSERT_ID();

    SET i=1;
    SET occurance = (SELECT LENGTH(codesstr) - LENGTH(REPLACE(codesstr, ',', '')) + 1);

    WHILE i <= occurance DO
        SET splitted_value =
            (SELECT REPLACE(SUBSTRING(SUBSTRING_INDEX(codesstr, ',', i),
            LENGTH(SUBSTRING_INDEX(codesstr, ',', i - 1)) + 1), ',', ''));

        INSERT INTO codes (PID, code) VALUES (newid, splitted_value);
        SET i = i + 1;
    END WHILE;
END

Note: The code to split the codes is taken from here (MySQL does not provide a split function for strings).

查看更多
别忘想泡老子
4楼-- · 2019-04-04 01:46

I developed my WPF application application using the Entity Framework and used SQL server database and needed to read data from an excel file and had to insert that data into 2 tables that has relationship between them. For roughly about 15000 rows in excel it used to take around 4 hours of time. Then what I did was I used a block of 500 rows per insert and this speeded up my insertion to unbelievalbe fast and now it takes mere 3-5 seconds to import that same data.

So I would suggest you add your rows to a Context like 100/200/500 at a time and then call the SaveChanges method (if you really want to be using EF). There are other helpful tips as well to speed up the performance for EF. Please read this for your reference.

var totalRecords = TestPacksData.Rows.Count;
var totalPages = (totalRecords / ImportRecordsPerPage) + 1;
while (count <= totalPages)
{
     var pageWiseRecords = TestPacksData.Rows.Cast<DataRow>().Skip(count * ImportRecordsPerPage).Take(ImportRecordsPerPage);
     count++;
     Project.CreateNewSheet(pageWiseRecords.ToList());
     Project.CreateNewSpool(pageWiseRecords.ToList());
}

And here is the CreateNewSheet method

/// <summary>
/// Creates a new Sheet record in the database
/// </summary>
/// <param name="row">DataRow containing the Sheet record</param>
public void CreateNewSheet(List<DataRow> rows)
{
     var tempSheetsList = new List<Sheet>();
     foreach (var row in rows)
     {
         var sheetNo = row[SheetFields.Sheet_No.ToString()].ToString();
         if (string.IsNullOrWhiteSpace(sheetNo))
              continue;
         var testPackNo = row[SheetFields.Test_Pack_No.ToString()].ToString();
         TestPack testPack = null;
         if (!string.IsNullOrWhiteSpace(testPackNo))
              testPack = GetTestPackByTestPackNo(testPackNo);

         var existingSheet = GetSheetBySheetNo(sheetNo);
         if (existingSheet != null)
         {
             UpdateSheet(existingSheet, row);
             continue;
         }

         var isometricNo = GetIsometricNoFromSheetNo(sheetNo);
         var newSheet = new Sheet
         {
             sheet_no = sheetNo,
             isometric_no = isometricNo,
             ped_rev = row[SheetFields.PED_Rev.ToString()].ToString(),
             gpc_rev = row[SheetFields.GPC_Rev.ToString()].ToString()
         };
         if (testPack != null)
         {
             newSheet.test_pack_id = testPack.id;
             newSheet.test_pack_no = testPack.test_pack_no;
         }
         if (!tempSheetsList.Any(l => l.sheet_no == newSheet.sheet_no))
         {
              DataStore.Context.Sheets.Add(newSheet);
              tempSheetsList.Add(newSheet);
         }
   }
   try
   {
        DataStore.Context.SaveChanges();
        **DataStore.Dispose();** This is very important. Dispose the context
   }
   catch (DbEntityValidationException ex)
   {
       // Create log for the exception here
   }
}

CreateNewSpool is ditto same method except for the fields name and table name, because it updates a child table. But the idea is the same

查看更多
够拽才男人
5楼-- · 2019-04-04 01:53

1 - Add a column VirtualId to User table & class.

EDITED 2 - Assign numbers in a loop for the VirtualId (use negative numbers starting -1 to avoid collisions in the last step) field in each User Object. For each Code c object belonging to User u object set the c.UserId = u.VirtualId.

3 - Bulk load Users into User table, Bulk load Codes into Code table.

4- UPDATE CODE C,USER U SET C.UserId = U.Id WHERE C.UserId = U.VirtualId.

NOTE : If you have a FK Constraint on Code.UserId you can drop it and re-add it after the Insert.

public class User
{
    public int Id { get; set; }
    public string FirstName { get; set; }
    public string LastName { get; set; }
    public int VirtualId { get; set; }

}

public class Code
{
    public int Id { get; set; }
    public string Code { get; set; }
    public string UserId { get; set; }
}
查看更多
ゆ 、 Hurt°
6楼-- · 2019-04-04 01:56

Can you break the CSV into two files?

E.g. Suppose your file has the following columns:

... A ... | ... B ... 
a0 | b0
a0 | b1
a0 | b2    <-- data
a1 | b3
a1 | b4

So one set of A might have multiple B entries. After you break it apart, you get:

... A ...
a0
a1

... B ...
b0
b1
b2
b3
b4

Then you bulk insert them separately.

Edit: Pseudo code

Based on the conversation, something like:

DataTable tableA = ...; // query schema for TableA
DataTable tableB = ...; // query schmea for TableB

List<String> usernames = select distinct username from TableA;
Hashtable htUsername = new Hashtable(StringComparer.InvariantCultureIgnoreCase);
foreach (String username in usernames)
     htUsername[username] = "";

int colUsername = ...;
foreach (String[] row in CSVFile) {
    String un = row[colUsername] as String;
    if (htUsername[un] == null) {
        // add new row to tableA
        DataRow row = tableA.NewRow();
        row["Username"] = un;
        // etc.
        tableA.Rows.Add(row);
        htUsername[un] = "";
    }
}

// bulk insert TableA

select userid, username from TableA
Hashtable htUserId = new Hashtable(StringComparer.InvariantCultureIgnoreCase);
// htUserId[username] = userid;

int colUserId = ...;

foreach (String[] row in CSVFile) {

    String un = row[colUsername] as String;
    int userid = (int) htUserId[un];
    DataRow row = tableB.NewRow();
    row[colUserId] = userId;
    // fill in other values
    tableB.Rows.Add(row);
    if (table.Rows.Count == 65000) {
        // bulk insert TableB
        var t = tableB.Clone();
        tableB.Dispose();
        tableB = t;
    }
}

if (tableB.Rows.Count > 0)
    // bulk insert TableB
查看更多
爷、活的狠高调
7楼-- · 2019-04-04 02:00

When you say "efficiently" are you talking memory, or time?

In terms of improving the speed of the inserts, if you can do multiple value blocks per insert statement, you can get 500% improvement in speed. I did some benchmarks on this over in this question: Which is faster: multiple single INSERTs or one multiple-row INSERT?

My approach is described in the answer, but simply put, reading in up to say 50 "rows" (to be inserted) at once and bundling them into a single INSERT INTO(...), VALUES(...),(...),(...)...(...),(...) type statement seems to really speed things up. At least, if you're restricted from not being able to bulk load.

Another approach btw if you have live data you can't drop indexes on during the upload, is to create a memory table on the mysql server without indexes, dump the data there, and then do an INSERT INTO live SELECT * FROM mem. Though that uses more memory on the server, hence the question at the start of this answer about "what do you mean by 'efficiently'?" :)

Oh, and there's probably nothing wrong with iterating through the file and doing all the first table inserts first, and then doing the second table ones. Unless the data is being used live, I guess. In that case you could definitely still use the bundled approach, but the application logic to do that is a lot more complex.

UPDATE: OP requested example C# code for multivalue insert blocks.

Note: this code assumes you have a number of structures already configured:

  1. tables List<string> - table names to insert into
  2. fieldslist Dictionary<string, List<String>> - list of field names for each table
  3. typeslist Dictionary<string, List<MySqlDbType>> - list of MySqlDbTypes for each table, same order as the field names.
  4. nullslist Dictionary<string, List<Boolean>> - list of flags to tell if a field is nullable or not, for each table (same order as field names).
  5. prikey Dictionary<string, string> - list of primary key field name, per table (note: this doesn't support multiple field primary keys, though if you needed it you could probably hack it in - I think somewhere I have a version that does support this, but... meh).
  6. theData Dictionary<string, List<Dictionary<int, object>>> - the actual data, as a list of fieldnum-value dictionaries, per table.

Oh yeah, and the localcommand is MySqlCommand created by using CreateCommand() on the local MySqlConnection object.

Further note: I wrote this quite a while back when I was kind of starting. If this causes your eyes or brain to bleed, I apologise in advance :)

const int perinsert = 50;
foreach (string table in tables)
{
    string[] fields = fieldslist[table].ToArray();
    MySqlDbType[] types = typeslist[table].ToArray();
    bool[] nulls = nullslist[table].ToArray();

    int thisblock = perinsert;
    int rowstotal = theData[table].Count;
    int rowsremainder = rowstotal % perinsert;
    int rowscopied = 0;

    // Do the bulk (multi-VALUES block) INSERTs, but only if we have more rows than there are in a single bulk insert to perform:
    while (rowscopied < rowstotal)
    {
        if (rowstotal - rowscopied < perinsert)
            thisblock = rowstotal - rowscopied;
        // Generate a 'perquery' multi-VALUES prepared INSERT statement:
        List<string> extravals = new List<string>();
        for (int j = 0; j < thisblock; j++)
            extravals.Add(String.Format("(@{0}_{1})", j, String.Join(String.Format(", @{0}_", j), fields)));
        localcmd.CommandText = String.Format("INSERT INTO {0} VALUES{1}", tmptable, String.Join(",", extravals.ToArray()));
        // Now create the parameters to match these:
        for (int j = 0; j < thisblock; j++)
            for (int i = 0; i < fields.Length; i++)
                localcmd.Parameters.Add(String.Format("{0}_{1}", j, fields[i]), types[i]).IsNullable = nulls[i];

        // Keep doing bulk INSERTs until there's less rows left than we need for another one:
        while (rowstotal - rowscopied >= thisblock)
        {
            // Queue up all the VALUES for this block INSERT:
            for (int j = 0; j < thisblock; j++)
            {
                Dictionary<int, object> row = theData[table][rowscopied++];
                for (int i = 0; i < fields.Length; i++)
                    localcmd.Parameters[String.Format("{0}_{1}", j, fields[i])].Value = row[i];
            }
            // Run the query:
            localcmd.ExecuteNonQuery();
        }
        // Clear all the paramters - we're done here:
        localcmd.Parameters.Clear();
    }
}
查看更多
登录 后发表回答