Question concurrent.futures.ThreadPoolExecutor.map (): le délai d'attente ne fonctionne pas


import concurrent.futures
import time 

def process_one(i):
    try:                                                                             
        print("dealing with {}".format(i))                                           
        time.sleep(50)
        print("{} Done.".format(i))                                                  
    except Exception as e:                                                           
        print(e)

def process_many():
    with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: 
        executor.map(process_one,
                range(100),                                                          
                timeout=3)                                                           


if __name__ == '__main__':                                                           
    MAX_WORKERS = 10
    try:
        process_many()
    except Exception as e:                                                           
        print(e)      

le docs dire:

L'itérateur renvoyé déclenche un concurrent.futures.TimeoutError si __next__() est appelé et le résultat n'est pas disponible après timeout secondes de l'appel initial à Executor.map()

Mais ici, le script n'a soulevé aucune exception et a continué d'attendre. Aucune suggestion?


13
2017-07-19 10:50


origine


Réponses:


Comme les documents l'indiquent, l'erreur de délai d'attente ne sera levée que si vous appelez le __next__() méthode sur la carte. Pour appeler cette méthode, vous pouvez par exemple convertir la sortie en liste:

from concurrent import futures
import threading
import time


def task(n):
    print("Launching task {}".format(n))
    time.sleep(n)
    print('{}: done with {}'.format(threading.current_thread().name, n))
    return n / 10


with futures.ThreadPoolExecutor(max_workers=5) as ex:
    results = ex.map(task, range(1, 6), timeout=3)
    print('main: starting')
    try:
        # without this conversion to a list, the timeout error is not raised
        real_results = list(results) 
    except futures._base.TimeoutError:
        print("TIMEOUT")

Sortie:

Launching task 1
Launching task 2
Launching task 3
Launching task 4
Launching task 5
ThreadPoolExecutor-9_0: done with 1
ThreadPoolExecutor-9_1: done with 2
TIMEOUT
ThreadPoolExecutor-9_2: done with 3
ThreadPoolExecutor-9_3: done with 4
ThreadPoolExecutor-9_4: done with 5

Ici, la n-ième tâche dort n secondes, le délai d'attente est augmenté après la tâche 2 est terminée.


MODIFIER: Si vous souhaitez terminer les tâches qui ne sont pas terminées, vous pouvez essayer les réponses dans ce question (ils n'utilisent pas ThreadPoolExecutor.map() cependant), ou vous pouvez simplement ignorer les valeurs renvoyées par les autres tâches et les laisser terminer:

from concurrent import futures
import threading
import time


def task(n):
    print("Launching task {}".format(n))
    time.sleep(n)
    print('{}: done with {}'.format(threading.current_thread().name, n))
    return n


with futures.ThreadPoolExecutor(max_workers=5) as ex:
    results = ex.map(task, range(1, 6), timeout=3)
    outputs = []
    try:
        for i in results:
            outputs.append(i)
    except futures._base.TimeoutError:
        print("TIMEOUT")
    print(outputs)

Sortie:

Launching task 1
Launching task 2
Launching task 3
Launching task 4
Launching task 5
ThreadPoolExecutor-5_0: done with 1
ThreadPoolExecutor-5_1: done with 2
TIMEOUT
[1, 2]
ThreadPoolExecutor-5_2: done with 3
ThreadPoolExecutor-5_3: done with 4
ThreadPoolExecutor-5_4: done with 5

0
2017-07-22 10:37



Comme on le voit dans le la source (pour python 3.7), map renvoie une fonction:

def map(self, fn, *iterables, timeout=None, chunksize=1):
    ...
    if timeout is not None:
        end_time = timeout + time.time()
    fs = [self.submit(fn, *args) for args in zip(*iterables)]
    # Yield must be hidden in closure so that the futures are submitted
    # before the first iterator value is required.
    def result_iterator():
        try:
            # reverse to keep finishing order
            fs.reverse()
            while fs:
                # Careful not to keep a reference to the popped future
                if timeout is None:
                    yield fs.pop().result()
                else:
                    yield fs.pop().result(end_time - time.time())
        finally:
            for future in fs:
                future.cancel()
    return result_iterator()

le TimeoutError est élevé de yield fs.pop().result(end_time - time.time()) appeler mais vous devez demander un résultat pour atteindre cet appel.

La raison en est que vous ne vous souciez pas de soumission les tâches. Les tâches sont soumises et commencent à s'exécuter dans les threads d'arrière-plan. Ce dont vous vous souciez, c'est le délai d'attente lorsque vous demandez un résultat - c'est un cas d'usage habituel que vous soumettez des tâches et que vous leur demandez un résultat dans un temps limité, pas seulement les soumettez et attendez qu'elles se terminent dans un temps limité.

Si ce dernier est ce dont vous étiez, vous pourriez utiliser wait, comme illustré par exemple dans Délais individuels pour concurrents.futures


0
2017-07-27 11:14