使用多查找网络路径(Using multiprocessing for finding networ

2019-08-04 12:15发布

我目前使用的networkx功能* all_simple_paths *找到一个给定的源和目标节点的G网内的所有路径。

在较大/更密集的网络,这个过程是非常密集。

我想知道,如果多可以想见,在这个问题上被使用,如果任何人对如何可能实现的任何想法,通过创建池等。

import networkx as nx

G = nx.complete_graph(8)
sources = [1,2]
targets = [5,6,7]

for target in targets:
    for source in sources:
        for path in nx.all_simple_paths(G, source=source, target=target, cutoff=None):
            print(path)

提前非常感谢您的任何建议!

Answer 1:

下面是使用工作进程的集合的版本。 每个工人获取source, target对从一个队列,并收集在一个列表中的路径。 当所有的路径已被找到,则结果被放入一个输出队列,和由主处理核对。

import networkx as nx
import multiprocessing as mp
import random
import sys
import itertools as IT
import logging
logger = mp.log_to_stderr(logging.DEBUG)


def worker(inqueue, output):
    result = []
    count = 0
    for pair in iter(inqueue.get, sentinel):
        source, target = pair
        for path in nx.all_simple_paths(G, source = source, target = target,
                                        cutoff = None):
            result.append(path)
            count += 1
            if count % 10 == 0:
                logger.info('{c}'.format(c = count))
    output.put(result)

def test_workers():
    result = []
    inqueue = mp.Queue()
    for source, target in IT.product(sources, targets):
        inqueue.put((source, target))
    procs = [mp.Process(target = worker, args = (inqueue, output))
             for i in range(mp.cpu_count())]
    for proc in procs:
        proc.daemon = True
        proc.start()
    for proc in procs:    
        inqueue.put(sentinel)
    for proc in procs:
        result.extend(output.get())
    for proc in procs:
        proc.join()
    return result

def test_single_worker():
    result = []
    count = 0
    for source, target in IT.product(sources, targets):
        for path in nx.all_simple_paths(G, source = source, target = target,
                                        cutoff = None):
            result.append(path)
            count += 1
            if count % 10 == 0:
                logger.info('{c}'.format(c = count))

    return result

sentinel = None

seed = 1
m = 1
N = 1340//m
G = nx.gnm_random_graph(N, int(1.7*N), seed)
random.seed(seed)
sources = [random.randrange(N) for i in range(340//m)]
targets = [random.randrange(N) for i in range(1000//m)]
output = mp.Queue()

if __name__ == '__main__':
    test_workers()
    # test_single_worker()
    # assert set(map(tuple, test_workers())) == set(map(tuple, test_single_worker()))

test_workers采用多处理, test_single_worker使用一个单一的过程。

运行test.py不会抛出一个AssertionError,所以它看起来像两个函数返回相同的结果(至少在有限的测试中,我已经运行)。

下面是使用timeit结果:

% python -mtimeit -s'import test as t' 't.test_workers()'
10 loops, best of 3: 6.71 sec per loop

% python -mtimeit -s'import test as t' 't.test_single_worker()'
10 loops, best of 3: 12.2 sec per loop

所以test_workers才实现了test_single_worker 1.8倍加速在这种情况下,2核系统上。 我们希望,该代码将很好地扩展了您的真正的问题了。 我很想知道结果。


感兴趣的几点:

  • 调用pool.apply_async在短暂的功能是很慢的,因为太多的时间花费传递参数中,并导致了通过队列,而不是使用的CPU做有用的计算。
  • 这是更好地收集在一个列表结果,把整个结果的output队列,而不是把结果output一次一个。 把队列中的每个对象腌,这是更快咸菜一个大名单比它是很多小名单。
  • 我认为它是安全的,从只有一个进程进行打印,因此打印语句不互相步骤(导致错位输出)。


Answer 2:

对于最简单的情况看来,你的路没有关系,相濡以沫,不如是相同图形的一部分等,所以不会有任何锁定的问题。

我会做的是你可以使用multiprocessing模块,在每个循环在启动一个新的进程targets使用Poolmap方法。

def create_graph_from_target( target )
    for source in sources:
        for path in nx.all_simple_paths(G, source=source, target=target, cutoff=None):
            print(path)

from multiprocessing import Pool
p = Pool( processes=4 )

p.map( create_graph_from_target, targets )
p.close()
p.join()


文章来源: Using multiprocessing for finding network paths