We have pretty large files, the order of 1-1.5 GB combined (mostly log files) with raw data that is easily parseable to a csv, which is subsequently supposed to be graphed to generate a set of graph images.
Currently, we are using bash scripts to turn the raw data into a csv file, with just the numbers that need to be graphed, and then feeding it into a gnuplot script. But this process is extremely slow. I tried to speed up the bash scripts by replacing some piped cut
s, tr
s etc. with a single awk
command, although this improved the speed, the whole thing is still very slow.
So, I am starting to believe there are better tools for this process. I am currently looking to rewrite this process in python+numpy or R. A friend of mine suggested using the JVM, and if I am to do that, I will use clojure, but am not sure how the JVM will perform.
I don't have much experience in dealing with these kind of problems, so any advice on how to proceed would be great. Thanks.
Edit: Also, I will want to store (to disk) the generated intermediate data, i.e., the csv, so I don't have to re-generate it, should I choose I want a different looking graph.
Edit 2: The raw data files have one record per one line, whose fields are separated by a delimiter (|
). Not all fields are numbers. Each field I need in the output csv is obtained by applying a certain formula on the input records, which may use multiple fields from the input data. The output csv will have 3-4 fields per line, and I need graphs that plot 1-2, 1-3, 1-4 fields in a (may be) bar chart. I hope that gives a better picture.
Edit 3: I have modified @adirau's script a little and it seems to be working pretty well. I have come far enough that I am reading data, sending to a pool of processor threads (pseudo processing, append thread name to data), and aggregating it into an output file, through another collector thread.
PS: I am not sure about the tagging of this question, feel free to correct it.
python sounds to be a good choice because it has a good threading API (the implementation is questionable though), matplotlib and pylab. I miss some more specs from your end but maybe this could be a good starting point for you: matplotlib: async plotting with threads.
I would go for a single thread for handling bulk disk i/o reads and sync queueing to a pool of threads for data processing (if you have fixed record lengths things may get faster by precomputing reading offsets and passing just the offsets to the threadpool); with the diskio thread I would mmap the datasource files, read a predefined num bytes + one more read to eventually grab the last bytes to the end of the current datasource lineinput; the numbytes should be chosen somewhere near your average lineinput length; next is pool feeding via the queue and the data processing / plotting that takes place in the threadpool; I don't have a good picture here (of what are you plotting exactly) but I hope this helps.
EDIT: there's file.readlines([sizehint]) to grab multiple lines at once; well it may not be so speedy cuz the docs are saying its using readline() internally
EDIT: a quick skeleton code
import threading
from collections import deque
import sys
import mmap
class processor(Thread):
"""
processor gets a batch of data at time from the diskio thread
"""
def __init__(self,q):
Thread.__init__(self,name="plotter")
self._queue = q
def run(self):
#get batched data
while True:
#we wait for a batch
dataloop = self.feed(self._queue.get())
try:
while True:
self.plot(dataloop.next())
except StopIteration:
pass
#sanitizer exceptions following, maybe
def parseline(self,line):
""" return a data struct ready for plotting """
raise NotImplementedError
def feed(self,databuf):
#we yield one-at-time datastruct ready-to-go for plotting
for line in databuf:
yield self.parseline(line)
def plot(self,data):
"""integrate
https://www.esclab.tw/wiki/index.php/Matplotlib#Asynchronous_plotting_with_threads
maybe
"""
class sharedq(object):
"""i dont recall where i got this implementation from
you may write a better one"""
def __init__(self,maxsize=8192):
self.queue = deque()
self.barrier = threading.RLock()
self.read_c = threading.Condition(self.barrier)
self.write_c = threading.Condition(self.barrier)
self.msz = maxsize
def put(self,item):
self.barrier.acquire()
while len(self.queue) >= self.msz:
self.write_c.wait()
self.queue.append(item)
self.read_c.notify()
self.barrier.release()
def get(self):
self.barrier.acquire()
while not self.queue:
self.read_c.wait()
item = self.queue.popleft()
self.write_c.notify()
self.barrier.release()
return item
q = sharedq()
#sizehint for readine lines
numbytes=1024
for i in xrange(8):
p = processor(q)
p.start()
for fn in sys.argv[1:]
with open(fn, "r+b") as f:
#you may want a better sizehint here
map = mmap.mmap(f.fileno(), 0)
#insert a loop here, i forgot
q.put(map.readlines(numbytes))
#some cleanup code may be desirable
I think python+Numpy would be the most efficient way, regarding speed and ease of implementation.
Numpy is highly optimized so the performance is decent, and python would ease up the algorithm implementation part.
This combo should work well for your case, providing you optimize the loading of the file on memory, try to find the middle point between processing a data block that isn't too large but large enough to minimize the read and write cycles, because this is what will slow down the program
If you feel that this needs more speeding up (which i sincerely doubt), you could use Cython to speed up the sluggish parts.