I'm looking for a way to define custom quoting
with csv.writer
in Python. There are 4 built-in ways to qoute values:
csv.QUOTE_ALL, csv.QUOTE_MINIMAL, csv.QUOTE_NONNUMERIC, csv.QUOTE_NONE
However I need a quoting mechanism which will emulate Postgres' FORCE QUOTE *
, i.e. it will quote all non-None values. With csv.QUOTE_ALL
Python will turn None into ''
but I would like to have empty string instead.
Is it possible to do that with built-in csv
module ( I'm not interested in hacks, I'm already doing that :P )? Or am I forced to write/get some custom csv parser?
And generally: is it possible to write custom quoting mechanism for csv
module?
Disable csv
quoting and add the quotes yourself:
def quote(col):
if col is None:
return ''
# uses double-quoting style to escape existing quotes
return '"{}"'.format(str(col).replace('"', '""'))
writer = csv.writer(fileobj, quoting=csv.QUOTE_NONE, escapechar='', quotechar='')
for row in rows:
writer.writerow(map(quote, row))
By setting both escapechar
and quotechar
to empty strings you avoid the module quoting your already-quoted values.
The above works as long as you don't use the delimiter in the csv values.
Note that by this time it would just be easier to write comma-delimited lines yourself:
with open(filename, 'w'), fd:
for row in rows:
fd.write(','.join(map(quote, row)) + '\r\n')
I've written my own csv writer which does exactly what I want:
class PostgresCSVWriter(object):
def __init__(self, stream, quotechar="\"", delimiter=",", escapechar="\\"):
self.stream = stream
self.quotechar = quotechar
self.delimiter = delimiter
self.escapechar = escapechar
self.buffer_size = 16384
def _convert_value(self, obj):
if obj is None:
return ""
value = str(obj)
value = value.replace(self.quotechar, self.quotechar+self.quotechar)
value = value.replace(self.delimiter, self.escapechar+self.delimiter)
return self.quotechar+value+self.quotechar
def _convert_row(self, row):
return self.delimiter.join(self._convert_value(v) for v in row) + "\r\n"
def writerow(self, row):
self.stream.write(self._convert_row(row))
def writerows(self, rows):
data = ""
counter = 0
for row in rows:
buf = self._convert_row(row)
data += buf
counter += len(buf)
if counter >= self.buffer_size:
self.stream.write(data)
data = ""
counter = 0
if data:
self.stream.write(data)
If anyone sees any problem with it, then please let me know. I'm still looking for a solution with csv
module though.