Question Est-il possible de multiprocesser une fonction qui retourne quelque chose en Python?


En Python, j'ai vu de nombreux exemples où le multitraitement est appelé, mais la cible imprime simplement quelque chose. J'ai un scénario où la cible renvoie 2 variables, que je dois utiliser plus tard. Par exemple:

def foo(some args):
   a = someObject
   b = someObject
   return a,b

p1=multiprocess(target=foo,args(some args))
p2=multiprocess(target=foo,args(some args))
p3=multiprocess(target=foo,args(some args))

Maintenant quoi? Je peux faire .start et .join, mais comment récupérer les résultats individuels? Je dois attraper le retour a, b pour tous les travaux que j'exécute et ensuite travailler dessus.


18
2018-05-29 11:12


origine


Réponses:


Oui, bien sûr - vous pouvez utiliser plusieurs méthodes. L'un des plus faciles est un partage Queue. Voir un exemple ici: http://eli.thegreenplace.net/2012/01/16/python-parallelizing-cpu-bound-tasks-with-multiprocessing/


17
2018-05-29 11:15



Vous cherchez à faire des travaux parallèlement embarrassants en utilisant plusieurs processus ... alors pourquoi ne pas utiliser un Pool? UNE Pool veillera à démarrer les processus, à récupérer les résultats et à vous renvoyer les résultats. Ici j'utilise pathos, qui a une fourchette de multiprocessing, car sa sérialisation est bien meilleure que celle fournie par la bibliothèque standard.

>>> from pathos.multiprocessing import ProcessingPool as Pool
>>> 
>>> def foo(obj1, obj2):
...   a = obj1.x**2
...   b = obj2.x**2
...   return a,b
... 
>>> class Bar(object):
...   def __init__(self, x):
...     self.x = x
... 
>>> res = Pool().map(foo, [Bar(1),Bar(2),Bar(3)], [Bar(4),Bar(5),Bar(6)])
>>> res
[(1, 16), (4, 25), (9, 36)]

Et vous voyez ça foo prend deux arguments et renvoie un tuple de deux objets. le map méthode de Pool soumet foo aux processus sous-jacents et renvoie le résultat comme res.

Tu peux recevoir pathos ici: https://github.com/uqfoundation


22
2018-03-14 15:22



Je copie cet exemple directement à partir des documents, car je ne peux pas vous donner de lien direct. Notez qu'il affiche les résultats de done_queue, mais vous pouvez faire ce que vous voulez avec.

#
# Simple example which uses a pool of workers to carry out some tasks.
#
# Notice that the results will probably not come out of the output
# queue in the same in the same order as the corresponding tasks were
# put on the input queue.  If it is important to get the results back
# in the original order then consider using `Pool.map()` or
# `Pool.imap()` (which will save on the amount of code needed anyway).
#
# Copyright (c) 2006-2008, R Oudkerk
# All rights reserved.
#

import time
import random

from multiprocessing import Process, Queue, current_process, freeze_support

#
# Function run by worker processes
#

def worker(input, output):
    for func, args in iter(input.get, 'STOP'):
        result = calculate(func, args)
        output.put(result)

#
# Function used to calculate result
#

def calculate(func, args):
    result = func(*args)
    return '%s says that %s%s = %s' % \
        (current_process().name, func.__name__, args, result)

#
# Functions referenced by tasks
#

def mul(a, b):
    time.sleep(0.5*random.random())
    return a * b

def plus(a, b):
    time.sleep(0.5*random.random())
    return a + b

#
#
#

def test():
    NUMBER_OF_PROCESSES = 4
    TASKS1 = [(mul, (i, 7)) for i in range(20)]
    TASKS2 = [(plus, (i, 8)) for i in range(10)]

    # Create queues
    task_queue = Queue()
    done_queue = Queue()

    # Submit tasks
    for task in TASKS1:
        task_queue.put(task)

    # Start worker processes
    for i in range(NUMBER_OF_PROCESSES):
        Process(target=worker, args=(task_queue, done_queue)).start()

    # Get and print results
    print 'Unordered results:'
    for i in range(len(TASKS1)):
        print '\t', done_queue.get()

    # Add more tasks using `put()`
    for task in TASKS2:
        task_queue.put(task)

    # Get and print some more results
    for i in range(len(TASKS2)):
        print '\t', done_queue.get()

    # Tell child processes to stop
    for i in range(NUMBER_OF_PROCESSES):
        task_queue.put('STOP')


if __name__ == '__main__':
    freeze_support()
    test()

Il est originaire de la module de multitraitement docs.


8
2018-05-29 11:19



Il ne fonctionnera pas sur Windows mais voici mon décorateur multiprocesseur pour les fonctions, il retourne une file d'attente que vous pouvez interroger et collecter les données renvoyées depuis

import os
from Queue import Queue
from multiprocessing import Process

def returning_wrapper(func, *args, **kwargs):
    queue = kwargs.get("multiprocess_returnable")
    del kwargs["multiprocess_returnable"]
    queue.put(func(*args, **kwargs))

class Multiprocess(object):
    """Cute decorator to run a function in multiple processes."""
    def __init__(self, func):
        self.func = func
        self.processes = []

    def __call__(self, *args, **kwargs):
        num_processes = kwargs.get("multiprocess_num_processes", 2) # default to two processes.
        return_obj = kwargs.get("multiprocess_returnable", Queue()) # default to stdlib Queue
        kwargs["multiprocess_returnable"] = return_obj
        for i in xrange(num_processes):
            pro = Process(target=returning_wrapper, args=tuple([self.func] + list(args)), kwargs=kwargs)
            self.processes.append(pro)
            pro.start()
        return return_obj


@Multiprocess
def info():
    print 'module name:', __name__
    print 'parent process:', os.getppid()
    print 'process id:', os.getpid()
    return 4 * 22

data = info()
print data.get(False)

2
2018-05-29 11:40



Voici un exemple de recherche multi-processus pour les fichiers volumineux.


1
2017-12-31 13:33



Pourquoi personne n'utilise rappeler de multiprocessing.Pool?

Exemple:

from multiprocessing import Pool
from contextlib import contextmanager

from pprint import pprint
from requests import get as get_page

@contextmanager
def _terminating(thing):
    try:
        yield thing
    finally:
        thing.terminate()

def _callback(*args, **kwargs):
    print("CALBACK")
    pprint(args)
    pprint(kwargs)

print("Processing...")
with _terminating(Pool(processes=WORKERS)) as pool:
    results = pool.map_async(get_page, URLS, callback=_callback)

    start_time = time.time()
    results.wait()
    end_time = time.time()
    print("Time for Processing: %ssecs" % (end_time - start_time))

Ici, j'imprime à la fois args et kwargs. Mais vous pouvez remplacer rappeler par:

def _callback2(responses):
    for r in responses:
        print(r.status_code) # or do whatever with response...

1
2018-05-21 13:31