Is it possible to use concurrent.futures to execut

2019-09-11 23:42发布

问题:

I am trying to use the pool of workers provided by concurrent.futures.ProcessPoolExecutor to speed up the performance of a method inside a tkinter class. This is because executing the method is cpu intensive and "parallelizing" it should shorten the time to complete it. I hope to benchmark it's performance against a control - a serial execution of the same method. I have written a tkinter GUI test code to perform this benchmark. The serial execution of the method works but the concurrent part does not work. Appreciate any help to get the concurrent part of my code to work.

Update: I have ensured that I have correctly implemented concurrent.futures.ProcessPoolExecutor to solve my problem outside of Tk(), i.e. from a standard python3 script. It is explained in this answer. Now I want to implement the concurrent method described in that answer to work with a button in my tkinter.Tk() GUI.

My test code is given below. When you run it, a GUI will appear. When you are click the 'FIND' Button, the _findmatch function will be executed in a serial and concurrent manner to find how many times the number 5 occurs in the number range of 0 to 1E8. The serial part works but the concurrent part is complaining (see below). Anyone knows how to fix this Pickling error?

Traceback (most recent call last):
  File "/usr/lib/python3.5/multiprocessing/queues.py", line 241, in _feed
    obj = ForkingPickler.dumps(obj)
  File "/usr/lib/python3.5/multiprocessing/reduction.py", line 50, in dumps
    cls(buf, protocol).dump(obj)
_pickle.PicklingError: Can't pickle <class '_tkinter.tkapp'>: attribute lookup tkapp on _tkinter failed

Test Code:

#!/usr/bin/python3
# -*- coding: utf-8 -*-

import tkinter as tk # Python 3 tkinter modules
import tkinter.ttk as ttk
import concurrent.futures as cf
from time import time, sleep
from itertools import repeat, chain 

class App(ttk.Frame):
    def __init__(self, parent):
        # Initialise App Frame
        ttk.Frame.__init__(self, parent, style='App.TFrame')
        self.parent=parent

        self.button = ttk.Button(self, style='start.TButton', text = 'FIND',
                                 command=self._check)
        self.label0 = ttk.Label(self, foreground='blue')
        self.label1 = ttk.Label(self, foreground='red')
        self.label2 = ttk.Label(self, foreground='green')
        self._labels()
        self.button.grid(row=0, column=1, rowspan=3, sticky='nsew')
        self.label0.grid(row=0, column=0, sticky='nsew')
        self.label1.grid(row=1, column=0, sticky='nsew')
        self.label2.grid(row=2, column=0, sticky='nsew')

    def _labels(self):
        self.label0.configure(text='Click "FIND" to see how many times the number 5 appears.')
        self.label1.configure(text='Serial Method:')
        self.label2.configure(text='Concurrent Method:')

    def _check(self):
        # Initialisation
        self._labels()
        nmax = int(1E7)
        smatch=[]
        cmatch=[]
        number = '5'
        self.label0.configure(
            text='Finding the number of times {0} appears in 0 to {1}'.format(
                number, nmax))
        self.parent.update_idletasks()

        # Run serial code
        start = time()
        smatch = self._findmatch(0, nmax, number)
        end = time() - start
        self.label1.configure(
            text='Serial: Found {0} occurances,  Time to Find: {1:.6f}sec'.format(
                len(smatch), end))

        # Run serial code concurrently with concurrent.futures
        workers = 6     # Pool of workers
        chunks_vs_workers = 30 # A factor of =>14 can provide optimum performance 
        num_of_chunks = chunks_vs_workers * workers
        start = time()
        cmatch = self._concurrent_map(nmax, number, workers, num_of_chunks)
        end = time() - start
        self.label2.configure(
            text='Concurrent: Found {0} occurances,  Time to Find: {1:.6f}sec'.format(
                len(cmatch), end))

    def _findmatch(self, nmin, nmax, number):
        '''Function to find the occurence of number in range nmin to nmax and return
           the found occurences in a list.'''
        start = time()
        match=[]
        for n in range(nmin, nmax):
            if number in str(n): match.append(n)
        end = time() - start
        #print("\n def _findmatch {0:<10} {1:<10} {2:<3} found {3:8} in {4:.4f}sec".
        #      format(nmin, nmax, number, len(match),end))
        return match

    def _concurrent_map(self, nmax, number, workers, num_of_chunks):
        '''Function that utilises concurrent.futures.ProcessPoolExecutor.map to
           find the occurrences of a given number in a number range in a concurrent
           manner.'''
        # 1. Local variables
        start = time()
        chunksize = nmax // num_of_chunks
        #2. Parallelization
        with cf.ProcessPoolExecutor(max_workers=workers) as executor:
            # 2.1. Discretise workload and submit to worker pool
            cstart = (chunksize * i for i in range(num_of_chunks))
            cstop = (chunksize * i if i != num_of_chunks else nmax
                     for i in range(1, num_of_chunks + 1))
            futures = executor.map(self._findmatch, cstart, cstop, repeat(number))
        end = time() - start
        print('\n within statement of def _concurrent_map(nmax, number, workers, num_of_chunks):')
        print("found in {0:.4f}sec".format(end))
        return list(chain.from_iterable(futures))


if __name__ == '__main__':
    root = tk.Tk()
    root.title('App'), root.geometry('550x60')
    app = App(root)
    app.grid(row=0, column=0, sticky='nsew')

    root.rowconfigure(0, weight=1)
    root.columnconfigure(0, weight=1)
    app.columnconfigure(0, weight=1)

    app.mainloop()

回答1:

I finally found a way to answer my question.

Mark Summerfields's book, Python in Practice(2014), mentioned that the multiprocessing module, called by concurrent.futures.ProcessPoolExecutor, can only call functions that are importable and use modules data (called by the functions) that are pickleable. As such, it is necessary for concurrent.futures.ProcessPoolExecutor and the functions (with its argument) it called to be found in a separate module than the tkinter GUI module, else it would not work.

As such, I created a separate class to host all the codes related to concurrent.futures.ProcessPoolExecutor and the functions and data it called, instead of putting them in the class app, my tkinter.Tk() GUI class, as I did previously. It worked!

I also managed to use threading.Threads to perform concurrent execution of my serial and concurrent tasks.

I am sharing my revised test code below to demonstrate how I did it and hope this helps anyone attempting to use concurrent.futures with tkinter.

It's really beautiful to see all the CPUs revving up with Tk GUI. :)

Revised Test Code:

#!/usr/bin/python3
# -*- coding: utf-8 -*-
''' Code to demonstrate how to use concurrent.futures.Executor object with tkinter.'''

import tkinter as tk # Python 3 tkinter modules
import tkinter.ttk as ttk
import concurrent.futures as cf
import threading
from time import time, sleep
from itertools import chain 


class App(ttk.Frame):
    def __init__(self, parent):
        # Initialise App Frame
        ttk.Frame.__init__(self, parent)
        self.parent=parent

        self.button = ttk.Button(self, text = 'FIND', command=self._check)
        self.label0 = ttk.Label(self, foreground='blue')
        self.label1 = ttk.Label(self, foreground='red')
        self.label2 = ttk.Label(self, foreground='green')
        self._labels()
        self.button.grid(row=0, column=1, rowspan=3, sticky='nsew')
        self.label0.grid(row=0, column=0, sticky='nsew')
        self.label1.grid(row=1, column=0, sticky='nsew')
        self.label2.grid(row=2, column=0, sticky='nsew')

    def _labels(self):
        self.label0.configure(text='Click "FIND" to see how many times the number 5 appears.')
        self.label1.configure(text='Serial Method:')
        self.label2.configure(text='Concurrent Method:')

    def _check(self):
        # Initialisation
        self._labels()
        nmax = int(1E8)
        workers = 6     # Pool of workers
        chunks_vs_workers = 30 # A factor of =>14 can provide optimum performance 
        num_of_chunks = chunks_vs_workers * workers
        number = '5'
        self.label0.configure(
            text='Finding the number of times {0} appears in 0 to {1}'.format(
                number, nmax))
        self.parent.update_idletasks()
        # Concurrent management of serial and concurrent tasks using threading
        self.serworker = threading.Thread(target=self._serial,
                                          args=(0, nmax, number))
        self.subworker  = threading.Thread(target=self._concurrent,
                                           args=(nmax, number, workers,
                                                 num_of_chunks))
        self.serworker.start()         
        self.subworker.start()         

    def _serial(self, nmin, nmax, number):
        fm = Findmatch
        # Run serial code
        start = time()
        smatch = fm._findmatch(fm, 0, nmax, number)
        end = time() - start
        self.label1.configure(
            text='Serial Method: {0} occurrences, Compute Time: {1:.6f}sec'.format(
                len(smatch), end))
        self.parent.update_idletasks()
        #print('smatch = ', smatch) 

    def _concurrent(self, nmax, number, workers, num_of_chunks): 
        fm = Findmatch
        # Run serial code concurrently with concurrent.futures .submit()
        start = time()
        cmatch = fm._concurrent_submit(fm, nmax, number, workers,
                                        num_of_chunks)
        end = time() - start
        self.label2.configure(
            text='Concurrent Method: {0} occurrences, Compute Time: {1:.6f}sec'.format(
                len(cmatch), end))
        self.parent.update_idletasks()
        #print('cmatch = ', cmatch) 


class Findmatch:
    ''' A class specially created to host concurrent.futures.ProcessPoolExecutor
        so that the function(s) it calls can be accessible by multiprocessing
        module. Multiprocessing requirements: codes must be importable and code
        data must be pickerable. ref. Python in Practice, by Mark Summerfields,
        section 4.3.2, pg 173, 2014'''
    def __init__(self):
        self.__init__(self)

    def _findmatch(self, nmin, nmax, number):
        '''Function to find the occurence of number in range nmin to nmax and return
           the found occurences in a list.'''
        start = time()
        match=[]
        for n in range(nmin, nmax):
            if number in str(n): match.append(n)
        end = time() - start
        #print("\n def _findmatch {0:<10} {1:<10} {2:<3} found {3:8} in {4:.4f}sec".
        #      format(nmin, nmax, number, len(match),end))
        return match

    def _concurrent_submit(self, nmax, number, workers, num_of_chunks):
        '''Function that utilises concurrent.futures.ProcessPoolExecutor.submit to
           find the occurrences of a given number in a number range in a concurrent
           manner.'''
        # 1. Local variables
        start = time()
        chunksize = nmax // num_of_chunks
        self.futures = []
        #2. Parallelization
        with cf.ProcessPoolExecutor(max_workers=workers) as executor:
            # 2.1. Discretise workload and submit to worker pool
            for i in range(num_of_chunks):
                cstart = chunksize * i
                cstop = chunksize * (i + 1) if i != num_of_chunks - 1 else nmax
                self.futures.append(executor.submit(
                    self._findmatch, self, cstart, cstop, number))
        end = time() - start
        print('\n within statement of def _concurrent_submit(nmax, number, workers, num_of_chunks):')
        print("found in {0:.4f}sec".format(end))
        return list(chain.from_iterable(f.result() for f in cf.as_completed(
            self.futures)))


if __name__ == '__main__':
    root = tk.Tk()
    root.title('App'), root.geometry('550x60')
    app = App(root)
    app.grid(row=0, column=0, sticky='nsew')

    root.rowconfigure(0, weight=1)
    root.columnconfigure(0, weight=1)
    app.columnconfigure(0, weight=1)

    app.mainloop()