how come I have more threads than processes I aske

2019-08-29 01:02发布

问题:

I am trying to parallelize some work, which runs on my mac (Pyton 3.2.2 under Mac OS 10.7) but gives the following error on a Linux cluster I run it where I got 4 cores and access Python 3.2. The error messages continue until I break execution manually.

Exception in thread Thread-2:
Traceback (most recent call last):
  File "/n/sw/python-3.2/lib/python3.2/threading.py", line 736, in _bootstrap_inner
    self.run()
  File "/n/sw/python-3.2/lib/python3.2/threading.py", line 689, in run
    self._target(*self._args, **self._kwargs)
  File "/n/sw/python-3.2/lib/python3.2/multiprocessing/pool.py", line 338, in _handle_tasks
    put(task)
_pickle.PicklingError: Can't pickle <class 'function'>: attribute lookup builtins.function failed

Process PoolWorker-2:
Process PoolWorker-4:
Traceback (most recent call last):
Traceback (most recent call last):
  File "/n/sw/python-3.2/lib/python3.2/multiprocessing/process.py", line 259, in _bootstrap
  File "/n/sw/python-3.2/lib/python3.2/multiprocessing/process.py", line 259, in _bootstrap
Process PoolWorker-1:
Traceback (most recent call last):
  File "/n/sw/python-3.2/lib/python3.2/multiprocessing/process.py", line 259, in _bootstrap
Process PoolWorker-12:
Traceback (most recent call last):
  File "/n/sw/python-3.2/lib/python3.2/multiprocessing/process.py", line 259, in _bootstrap
    self.run()
  File "/n/sw/python-3.2/lib/python3.2/multiprocessing/process.py", line 114, in run
    self._target(*self._args, **self._kwargs)
  File "/n/sw/python-3.2/lib/python3.2/multiprocessing/pool.py", line 102, in worker
Process PoolWorker-11:
Traceback (most recent call last):
  File "/n/sw/python-3.2/lib/python3.2/multiprocessing/process.py", line 259, in _bootstrap
    self.run()
  File "/n/sw/python-3.2/lib/python3.2/multiprocessing/process.py", line 114, in run
    self._target(*self._args, **self._kwargs)
  File "/n/sw/python-3.2/lib/python3.2/multiprocessing/pool.py", line 102, in worker

Of course, for reference, here is part of my code. I don't see how this call of multiprocessing.Pool should result in these errors, esp. PoolWorkers with numbers higher than the 4 processes. Thanks for any thoughts!

import csv
import networkx as nx
import time
import shutil
import datetime
import pydot
import os
import re
import logging
from operator import itemgetter
import numpy as np
from multiprocessing import Pool
import itertools

# Dictionary for edge attributes in projected graph:
# 0: overlap_length
# 1: overlap_start
# 2: overlap_end
# 3: cell
# 4: level

def chunks(l,n):
    """Divide a list of nodes `l` in `n` chunks"""
    l_c = iter(l)
    while 1:
        x = tuple(itertools.islice(l_c,n))
        if not x:
            return
        yield x

def overlaps(G,B,u,nbrs2):
    l = []
    for v in nbrs2:
        for mutual_cell in set(B[u]) & set(B[v]):
            for uspell in B.get_edge_data(u,mutual_cell).values():
                ustart = uspell[1]
                uend = uspell[2]
                for vspell in B.get_edge_data(v,mutual_cell).values():
                    vstart = vspell[1]
                    vend = vspell[2]
                    if uend > vstart and vend > ustart:
                        ostart = max(ustart,vstart)
                        oend = min(uend,vend)
                        olen = (oend-ostart+1)/86400
                        ocell = mutual_cell
                        if (v not in G[u] or ostart not in [ edict[1] for edict in G[u][v].values() ]):
                            l.append((u,v,{0: olen,1: ostart,2: oend,3: ocell}))
    return l

def _pmap1(arg_tuple):
    """Pool for multiprocess only accepts functions with one argument. This function
    uses a tuple as its only argument.
    """
    return overlaps(arg_tuple[0],arg_tuple[1],arg_tuple[2],arg_tuple[3])

def time_overlap_projected_graph_parallel(B, nodes):
    G=nx.MultiGraph()
    G.add_nodes_from((n,B.node[n]) for n in nodes)
    add_edges_from = nx.MultiGraph.add_edges_from
    get_edge_data = nx.MultiGraph.get_edge_data
    p = Pool(processes=4)
    node_divisor = len(p._pool)
    for u in nodes:
        unbrs = set(B[u])
        nbrs2 = set((n for nbr in unbrs for n in B[nbr])) - set([u])
        # iterate over subsets of neighbors - parallelize
        node_chunks = list(chunks(nbrs2,int(len(nbrs2)/int(node_divisor))))
        num_chunks = len(node_chunks)
        pedgelists = p.map(_pmap1,
                           zip([G]*num_chunks,
                               [B]*num_chunks,
                               [u]*num_chunks,
                               node_chunks))
        ll = []
        for l in pedgelists:
            ll.extend(l)
        G.add_edges_from(ll)
        # compile long list
           # add edges from long list in a single step
    return G

回答1:

OK, I was "inadvertently" trying to cProfile the parallel run on the cluster, while I was simply having test runs offline. The code runs fine, but profiling breaks down -- as it always should for parallel scripts. It is not related to the cluster or LSF. Sorry.