我有以下的代码写入到MD5SUMS一个日志文件
for file in files_output:
p=subprocess.Popen(['md5sum',file],stdout=logfile)
p.wait()
将这些并行写的? 也就是说,如果需要的md5sum长时间的文件之一,将一个又一个等待前一个完成之前启动?
如果答案以上是肯定的,我可以假设写入日志文件的MD5SUMS的顺序可能是不同的基于的md5sum多久每个文件需要? (有些文件可能是巨大的,一些小)
所有子进程并行运行。 (为了避免这其中有明确等待其完成)。他们甚至可以写入在同一时间日志文件,从而错乱输出。 为了避免这种情况,你应该让每个进程写入到不同的日志文件,并收集所有输出当所有的处理完成。
q = Queue.Queue()
result = {} # used to store the results
for fileName in fileNames:
q.put(fileName)
def worker():
while True:
fileName = q.get()
if fileName is None: # EOF?
return
subprocess_stuff_using(fileName)
wait_for_finishing_subprocess()
checksum = collect_md5_result_for(fileName)
result[fileName] = checksum # store it
threads = [ threading.Thread(target=worker) for _i in range(20) ]
for thread in threads:
thread.start()
q.put(None) # one EOF marker for each thread
在此之后,结果应存放于result
。
- 是的,这些过程的md5sum将同时启动。
- 是的,MD5SUMS写入的顺序将是不可预知的。 一般它被认为是不好的做法,许多进程共享像文件中的单个资源这种方式。
此外,您的制作方式p.wait()
后for
循环将等待只为最后的md5sum过程的完成和他们的休息可能仍在运行。
但是,你可以稍微修改此代码还是有并行处理和同步输出的可预见性的好处,如果你收集的md5sum输出到临时文件,并收集回成一个文件,一旦所有工序均由内部完成。
import subprocess
import os
processes = []
for file in files_output:
f = os.tmpfile()
p = subprocess.Popen(['md5sum',file],stdout=f)
processes.append((p, f))
for p, f in processes:
p.wait()
f.seek(0)
logfile.write(f.read())
f.close()
一个简单的方法来收集并行的md5sum子过程的输出是使用一个线程池,写从主进程文件:
from multiprocessing.dummy import Pool # use threads
from subprocess import check_output
def md5sum(filename):
try:
return check_output(["md5sum", filename]), None
except Exception as e:
return None, e
if __name__ == "__main__":
p = Pool(number_of_processes) # specify number of concurrent processes
with open("md5sums.txt", "wb") as logfile:
for output, error in p.imap(md5sum, filenames): # provide filenames
if error is None:
logfile.write(output)
- 从输出
md5sum
很小,因此可以将其存储在内存中 -
imap
保留订单 -
number_of_processes
可以是来自文件或CPU内核的数量不同(更大的数值,并不意味着更快:这取决于IO的相对性能(磁盘)和CPU)
你可以尝试一次通过多个文件到的md5sum子进程。
你不需要在这种情况下,外部子; 你可以在Python计算MD5 :
import hashlib
from functools import partial
def md5sum(filename, chunksize=2**15, bufsize=-1):
m = hashlib.md5()
with open(filename, 'rb', bufsize) as f:
for chunk in iter(partial(f.read, chunksize), b''):
m.update(chunk)
return m.hexdigest()
要使用的多个进程,而不是线程(以允许纯Python md5sum()
并行利用多个CPU上运行)只是下降.dummy
从进口在上面的代码。