SQLite transaction for CSV importing

2019-01-29 03:18发布

I'm very new to python and have been working on my raspberry pi to get a script up and running to import millions of sensor data records into sqlite. I want to do this in transactions to make the process more efficient. I am trying to break the transactions down into 10k chunks as done here: Python CSV to SQLite

So far I have

import csv, sqlite3, time

def chunks(data, rows=10000):
    for i in range (0, len(data), rows):
            yield data[i:i+rows]

if __name__ == "__main__":

    t = time.time()

con = sqlite3.connect('test.db')
cur = con.cursor()
cur.execute("DROP TABLE IF EXISTS sensor;")
cur.execute("CREATE TABLE sensor(key INT, reading REAL);")

filename = 'dummy.csv'
reader = csv.reader(open(filename,"r"))
divdata = chunks(reader)

for chunk in divdata:
    cur.execute('BEGIN TRANSACTION')

    for col1, col2 in chunk:
            cur.execute('INSERT INTO sensor (key, reading) VALUES (?, ?)', (col1, col2))

    con.execute('COMMIT')

I'm getting the following error in python 3.2.3:

Traceback (most recent call last):
File "/home/pi/test1.py", line 20, in <module>
for chunk in divdata:
File "/home/pi/test1.py", line 4, in chunks
for i in range (0, len(data), rows):
TypeError: object of type '_csv.reader' has no len()

I'm obviously messing up in the chunks part somewhere as everything (basic insertion) works fine without the chunks and transaction. Any help appreciated.

3条回答
2楼-- · 2019-01-29 03:47

Your SQL looks okay. I do see a problem with your CSV reader, though: it doesn't support len() the way you're using it in chunks().

You could either use the more typical for row in data loop, or use one of the techniques described in this thread if you need to break the file into chunks.

查看更多
霸刀☆藐视天下
3楼-- · 2019-01-29 03:49

The problem is that the object you get from the csv.reader method does not support the len() function. In fact, this CSV reader reads only when instructed to do so, and as such, it does not know how many records are in the file.

As such, you need to update the chunks method to deal with not knowing the amount of things to be chunked. Replace that function with this:

def chunks(data, n=10000):
    buffer = [None] * n
    idx = 0
    for record in data:
        buffer[idx] = record
        idx += 1
        if idx == n:
            yield buffer
            buffer = [None] * n
            idx = 0
    if idx > 0:
        yield buffer[:idx]

What this does is: it keeps retrieving records from your file as long as there are records to retrieve, and every n rows, emits a list of n records. For example:

>>> for c in chunks(range(10), 3):
...     print(c)
...
[0, 1, 2]
[3, 4, 5]
[6, 7, 8]
[9]
查看更多
Animai°情兽
4楼-- · 2019-01-29 03:56

There were two problems in the code snippet in the quest:

  1. the reader in the call to chunks should have been wrapped in list()
  2. the 'commit' should have been using the connection's commit() method

See the fixed code:

import csv, sqlite3, time

def chunks(data, rows=10000):
    for i in range (0, len(data), rows):
            yield data[i:i+rows]

if __name__ == "__main__":

    t = time.time()

con = sqlite3.connect('test.db')
cur = con.cursor()
cur.execute("DROP TABLE IF EXISTS sensor;")
cur.execute("CREATE TABLE sensor(key INT, reading REAL);")

filename = 'dummy.csv'
reader = csv.reader(open(filename,"r"))
divdata = chunks(list(reader))

for chunk in divdata:
    cur.execute('BEGIN TRANSACTION')

    for col1, col2 in chunk:
            cur.execute('INSERT INTO sensor (key, reading) VALUES (?, ?)', (col1, col2))

    con.commit()
查看更多
登录 后发表回答