我设计一个MySQL
一个需要处理每秒大约600行插入在各个InnoDB表数据库。 我目前的实现使用非成批的预处理语句。 然而,写MySQL
数据库的瓶颈,并随着时间的推移我的队列大小增加。
实现是用Java编写的,我不知道这个版本了手。 它使用MySQL
的Java连接 。 我需要考虑切换到JDBC
的明天。 我猜想这是两个不同的连接器包。
我已阅读关于这个问题的以下主题:
- 优化的MySQL插入处理的数据流
- MyISAM数据与InnoDB的
- 插入二进制数据到MySQL(没有的PreparedStatement的)
并从MySQL网站:
- http://dev.mysql.com/doc/refman/5.0/en/insert-speed.html
我的问题是:
有没有人对使用插入在批处理模式准备语句与使用单一的性能差异的建议或经验INSERT
语句中有多个值。
什么是之间的性能差异MySQL
的Java连接器与JDBC
。 我应该使用一个或其他?
该表是用于归档的目的,而且会看到〜90%写入〜读的10%(甚至更低)。 我使用的是InnoDB。 这是对MyISAM数据是正确的选择?
预先感谢您的帮助。
JDBC是一个简单的数据库访问的Java SE标准提供的标准接口,因此你没有真正绑定到特定的JDBC实现。 MySQL的Java连接器(Connector / J)为仅MySQL数据库的JDBC接口的实现。 出来的经验,我参与到使用使用MySQL大数据量的一个项目,我们大多喜欢的MyISAM为可以生成的数据:它允许实现更高的性能会丢失一些事务,但总体来讲,MyISAM的速度更快,但是InnoDB的是更加可靠。
我想知道的INSERT语句过于有关的性能一年前,发现下面的旧测试代码在我的代码货架(对不起,这是一个有点复杂,有点出你的问题范围)。 下面的代码包含的插入所述测试数据的4种方式的例子:
- 单
INSERT
S; - 批处理
INSERT
秒; - 手动批量
INSERT
(从来没有使用它-这是危险的); - 最后制备散装
INSERT
)。
它使用TestNG的作为亚军,并使用一些自定义代码的遗产,如:
- 在
runWithConnection()
方法-确保执行后回调(但下面的代码使用不声明关闭可靠的策略-即使没有连接被关闭或放回连接池try
/ finally
以减少代码); -
IUnsafeIn<T, E extends Throwable>
-用于接受一个单一的参数,但可能抛出型E的例外,像的方法的自定义回调接口: void handle(T argument) throws E;
。
package test;
import test.IUnsafeIn;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import static java.lang.String.format;
import static java.lang.String.valueOf;
import static java.lang.System.currentTimeMillis;
import core.SqlBaseTest;
import org.testng.annotations.AfterSuite;
import org.testng.annotations.BeforeSuite;
import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;
public final class InsertVsBatchInsertTest extends SqlBaseTest {
private static final int ITERATION_COUNT = 3000;
private static final String CREATE_TABLE_QUERY = "CREATE TABLE IF NOT EXISTS ttt1 (c1 INTEGER, c2 FLOAT, c3 VARCHAR(5)) ENGINE = InnoDB";
private static final String DROP_TABLE_QUERY = "DROP TABLE ttt1";
private static final String CLEAR_TABLE_QUERY = "DELETE FROM ttt1";
private static void withinTimer(String name, Runnable runnable) {
final long start = currentTimeMillis();
runnable.run();
logStdOutF("%20s: %d ms", name, currentTimeMillis() - start);
}
@BeforeSuite
public void createTable() {
runWithConnection(new IUnsafeIn<Connection, SQLException>() {
@Override
public void handle(Connection connection) throws SQLException {
final PreparedStatement statement = connection.prepareStatement(CREATE_TABLE_QUERY);
statement.execute();
statement.close();
}
});
}
@AfterSuite
public void dropTable() {
runWithConnection(new IUnsafeIn<Connection, SQLException>() {
@Override
public void handle(Connection connection) throws SQLException {
final PreparedStatement statement = connection.prepareStatement(DROP_TABLE_QUERY);
statement.execute();
statement.close();
}
});
}
@BeforeTest
public void clearTestTable() {
runWithConnection(new IUnsafeIn<Connection, SQLException>() {
@Override
public void handle(Connection connection) throws SQLException {
final PreparedStatement statement = connection.prepareStatement(CLEAR_TABLE_QUERY);
statement.execute();
statement.close();
}
});
}
@Test
public void run1SingleInserts() {
withinTimer("Single inserts", new Runnable() {
@Override
public void run() {
runWithConnection(new IUnsafeIn<Connection, SQLException>() {
@Override
public void handle(Connection connection) throws SQLException {
for ( int i = 0; i < ITERATION_COUNT; i++ ) {
final PreparedStatement statement = connection.prepareStatement("INSERT INTO ttt1 (c1, c2, c3) VALUES (?, ?, ?)");
statement.setInt(1, i);
statement.setFloat(2, i);
statement.setString(3, valueOf(i));
statement.execute();
statement.close();
}
}
});
}
});
}
@Test
public void run2BatchInsert() {
withinTimer("Batch insert", new Runnable() {
@Override
public void run() {
runWithConnection(new IUnsafeIn<Connection, SQLException>() {
@Override
public void handle(Connection connection) throws SQLException {
final PreparedStatement statement = connection.prepareStatement("INSERT INTO ttt1 (c1, c2, c3) VALUES (?, ?, ?)");
for ( int i = 0; i < ITERATION_COUNT; i++ ) {
statement.setInt(1, i);
statement.setFloat(2, i);
statement.setString(3, valueOf(i));
statement.addBatch();
}
statement.executeBatch();
statement.close();
}
});
}
});
}
@Test
public void run3DirtyBulkInsert() {
withinTimer("Dirty bulk insert", new Runnable() {
@Override
public void run() {
runWithConnection(new IUnsafeIn<Connection, SQLException>() {
@Override
public void handle(Connection connection) throws SQLException {
final StringBuilder builder = new StringBuilder("INSERT INTO ttt1 (c1, c2, c3) VALUES ");
for ( int i = 0; i < ITERATION_COUNT; i++ ) {
if ( i != 0 ) {
builder.append(",");
}
builder.append(format("(%s, %s, '%s')", i, i, i));
}
final String query = builder.toString();
final PreparedStatement statement = connection.prepareStatement(query);
statement.execute();
statement.close();
}
});
}
});
}
@Test
public void run4SafeBulkInsert() {
withinTimer("Safe bulk insert", new Runnable() {
@Override
public void run() {
runWithConnection(new IUnsafeIn<Connection, SQLException>() {
private String getInsertPlaceholders(int placeholderCount) {
final StringBuilder builder = new StringBuilder("(");
for ( int i = 0; i < placeholderCount; i++ ) {
if ( i != 0 ) {
builder.append(",");
}
builder.append("?");
}
return builder.append(")").toString();
}
@SuppressWarnings("AssignmentToForLoopParameter")
@Override
public void handle(Connection connection) throws SQLException {
final int columnCount = 3;
final StringBuilder builder = new StringBuilder("INSERT INTO ttt1 (c1, c2, c3) VALUES ");
final String placeholders = getInsertPlaceholders(columnCount);
for ( int i = 0; i < ITERATION_COUNT; i++ ) {
if ( i != 0 ) {
builder.append(",");
}
builder.append(placeholders);
}
final int maxParameterIndex = ITERATION_COUNT * columnCount;
final String query = builder.toString();
final PreparedStatement statement = connection.prepareStatement(query);
int valueIndex = 0;
for ( int parameterIndex = 1; parameterIndex <= maxParameterIndex; valueIndex++ ) {
statement.setObject(parameterIndex++, valueIndex);
statement.setObject(parameterIndex++, valueIndex);
statement.setObject(parameterIndex++, valueIndex);
}
statement.execute();
statement.close();
}
});
}
});
}
}
看看与@Test注解的方法:他们实际执行INSERT
语句。 还请大家看看CREATE_TABLE_QUERY
不变:在源代码中,它使用InnoDB的安装(MySQL连接/ J 5.1.12)的MySQL 5.5在生产我的机器,结果如下:
InnoDB
Single inserts: 74148 ms
Batch insert: 84370 ms
Dirty bulk insert: 178 ms
Safe bulk insert: 118 ms
如果更改CREATE_TABLE_QUERY
的InnoDB在MyISAM,你会看到显著的性能提升:
MyISAM
Single inserts: 604 ms
Batch insert: 447 ms
Dirty bulk insert: 63 ms
Safe bulk insert: 26 ms
希望这可以帮助。
UPD:
第四届方式,你必须正确地自定义max_allowed_packet
在mysql.ini
(中[mysqld]
部分)要大到足以支持真正的大包。
我知道这个线程是很老,但我只是想我要提及的是,如果你使用MySQL时,加上“rewriteBatchedStatements = true”添加到JDBC URL,它可以使用批处理的语句时,造成巨大的性能提升。
你有任何受影响的表中的任何触发器? 如果没有,每秒600个插入看起来并不像很多。
从JDBC批量插入功能将多次发出相同的语句在同一个事务,而多值SQL将挤在一个声明中的所有值。 在多值声明的情况下,你将不得不构建插入SQL动态,这可能是在更多的代码方面的开销,更多的内存,SQL注入防护机制等首先尝试定期批处理功能,为您的工作负载,它不应该是一个问题。
如果您没有收到批量数据再考虑插入之前分批它。 我们用单独的线程队列来实现生产者 - 消费者的安排。 在此我们忍住插入,直到经过一定时间或队列的大小超过阈值。
如果你想生产者通知有关插入成功,则需要更多的管道。
有时候,只是阻塞的线程可以更直接和实际。
if(System.currentTimeMills()-lastInsertTime>TIME_THRESHOLD || queue.size()>SIZE_THRESHOLD) {
lastInsertTime=System.currentTimeMills();
// Insert logic
} else {
// Do nothing OR sleep for some time OR retry after some time.
}
经过我自己的一些测试约旦升给了最好的提示。 我认为执行时间Lyubomyr已经为InnoDB的非脏批量插入给定是错误的,因为他很可能没有JDBC连接字符串中使用“rewriteBatchedStatements =真”。 没有它,批次是毫无价值的。 在我自己的测试使用预处理语句非脏批量插入甚至快于准备好的语句做的肮脏的方式。
文章来源: Performance of MySQL Insert statements in Java: Batch mode prepared statements vs single insert with multiple values