Question Comment utiliser le threading en Python?


J'essaie de comprendre le threading en Python. J'ai regardé la documentation et les exemples, mais franchement, de nombreux exemples sont trop sophistiqués et j'ai du mal à les comprendre.

Comment montrez-vous clairement les tâches divisées pour le multi-threading?


928
2018-05-17 04:24


origine


Réponses:


Depuis que cette question a été posée en 2010, il y a eu une réelle simplification dans la façon de faire du multithreading simple avec python avec carte et bassin.

Le code ci-dessous provient d'un article / blog que vous devriez absolument vérifier (aucune affiliation) - Parallélisme en une ligne: Un meilleur modèle pour les tâches de threading au jour le jour. Je vais résumer ci-dessous - il ne reste que quelques lignes de code:

from multiprocessing.dummy import Pool as ThreadPool 
pool = ThreadPool(4) 
results = pool.map(my_function, my_array)

Quelle est la version multithread de:

results = []
for item in my_array:
    results.append(my_function(item))

La description

Map est une petite fonction sympa, et la clé pour injecter facilement du parallélisme dans votre code Python. Pour ceux qui ne sont pas familiers, la carte est quelque chose de levé à partir de langages fonctionnels comme Lisp. C'est une fonction qui mappe une autre fonction sur une séquence.

Map gère l'itération sur la séquence pour nous, applique la fonction et stocke tous les résultats dans une liste pratique à la fin.

enter image description here


la mise en oeuvre

Les versions parallèles de la fonction map sont fournies par deux bibliothèques: multiprocessing, et aussi son petit pas connu, mais tout aussi fantastique: child: multiprocessing.dummy.

multiprocessing.dummy est exactement le même que le module de multi-traitement, mais utilise des threads à la place (une distinction importante - utiliser plusieurs processus pour les tâches gourmandes en ressources processeur; discussions pour (et pendant) IO):

multiprocessing.dummy réplique l'API du multitraitement mais n'est pas plus qu'un wrapper autour du module de threading.

import urllib2 
from multiprocessing.dummy import Pool as ThreadPool 

urls = [
  'http://www.python.org', 
  'http://www.python.org/about/',
  'http://www.onlamp.com/pub/a/python/2003/04/17/metaclasses.html',
  'http://www.python.org/doc/',
  'http://www.python.org/download/',
  'http://www.python.org/getit/',
  'http://www.python.org/community/',
  'https://wiki.python.org/moin/',
]

# make the Pool of workers
pool = ThreadPool(4) 

# open the urls in their own threads
# and return the results
results = pool.map(urllib2.urlopen, urls)

# close the pool and wait for the work to finish 
pool.close() 
pool.join() 

Et les résultats de synchronisation:

Single thread:   14.4 seconds
       4 Pool:   3.1 seconds
       8 Pool:   1.4 seconds
      13 Pool:   1.3 seconds

Passer plusieurs arguments (fonctionne comme ça seulement en Python 3.3 et plus tard):

Pour passer plusieurs tableaux:

results = pool.starmap(function, zip(list_a, list_b))

ou pour passer une constante et un tableau:

results = pool.starmap(function, zip(itertools.repeat(constant), list_a))

Si vous utilisez une version antérieure de Python, vous pouvez passer plusieurs arguments via cette solution de contournement.

(Grâce à user136036 pour le commentaire utile)


1008
2018-02-11 19:53



Voici un exemple simple: vous devez essayer quelques URL alternatives et retourner le contenu du premier pour répondre.

import Queue
import threading
import urllib2

# called by each thread
def get_url(q, url):
    q.put(urllib2.urlopen(url).read())

theurls = ["http://google.com", "http://yahoo.com"]

q = Queue.Queue()

for u in theurls:
    t = threading.Thread(target=get_url, args = (q,u))
    t.daemon = True
    t.start()

s = q.get()
print s

C'est un cas où le threading est utilisé comme une simple optimisation: chaque sous-thread attend une URL pour résoudre et répondre, afin de mettre son contenu dans la file d'attente; chaque thread est un démon (ne gardera pas le processus en place si le thread principal se termine - c'est plus commun que pas); le thread principal démarre tous les sous-threads, fait un get sur la file d'attente pour attendre que l'un d'eux a fait un put, puis émet les résultats et se termine (ce qui supprime tous les sous-threads qui pourraient encore être en cours d'exécution, car ce sont des threads démon).

L'utilisation correcte des threads en Python est invariablement connectée aux opérations d'E / S (puisque CPython n'utilise pas plusieurs cœurs pour exécuter des tâches liées au CPU de toute façon, la seule raison de threading ne bloque pas le processus pendant qu'il y a des E / S ). Les files d'attente sont presque invariablement la meilleure façon d'extraire du travail vers des threads et / ou de collecter les résultats du travail, d'ailleurs, et elles sont intrinsèquement sûres pour vous éviter de vous soucier des verrous, conditions, événements, sémaphores et autres concepts de coordination / communication de fil.


672
2018-05-17 04:36



REMARQUE: Pour la parallélisation réelle en Python, vous devez utiliser le multitraitement module pour bifurquer plusieurs processus qui s'exécutent en parallèle (en raison du verrou global de l'interpréteur, les threads Python fournissent un entrelacement mais sont en fait exécutés en série, pas en parallèle, et ne sont utiles que pour l'entrelacement des opérations d'E / S).

Cependant, si vous recherchez simplement l'entrelacement (ou si vous effectuez des opérations d'E / S qui peuvent être parallélisées malgré le verrou global de l'interpréteur), alors enfilage module est l'endroit pour commencer. Comme un exemple très simple, considérons le problème de la somme d'une grande plage en sommant des sous-domaines en parallèle:

import threading

class SummingThread(threading.Thread):
     def __init__(self,low,high):
         super(SummingThread, self).__init__()
         self.low=low
         self.high=high
         self.total=0

     def run(self):
         for i in range(self.low,self.high):
             self.total+=i


thread1 = SummingThread(0,500000)
thread2 = SummingThread(500000,1000000)
thread1.start() # This actually causes the thread to run
thread2.start()
thread1.join()  # This waits until the thread has completed
thread2.join()  
# At this point, both threads have completed
result = thread1.total + thread2.total
print result

Notez que ce qui précède est un exemple très stupide, car il ne fait absolument aucune E / S et sera exécuté en série bien qu'intercalé (avec la surcharge supplémentaire de changement de contexte) dans CPython en raison du verrou global de l'interpréteur.


226
2018-05-17 04:35



Comme d'autres mentionnés, CPython peut utiliser des threads uniquement pour les attentes I \ O dues à GIL. Si vous souhaitez bénéficier de plusieurs cœurs pour les tâches liées au processeur, utilisez multitraitement:

from multiprocessing import Process

def f(name):
    print 'hello', name

if __name__ == '__main__':
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()

86
2018-03-08 22:22



Juste une note, file d'attente n'est pas nécessaire pour le filetage.

C'est l'exemple le plus simple que j'imagine qui montre 10 processus s'exécutant simultanément.

import threading
from random import randint
from time import sleep


def print_number(number):
    # Sleeps a random 1 to 10 seconds
    rand_int_var = randint(1, 10)
    sleep(rand_int_var)
    print "Thread " + str(number) + " slept for " + str(rand_int_var) + " seconds"

thread_list = []

for i in range(1, 10):
    # Instantiates the thread
    # (i) does not make a sequence, so (i,)
    t = threading.Thread(target=print_number, args=(i,))
    # Sticks the thread in a list so that it remains accessible
    thread_list.append(t)

# Starts threads
for thread in thread_list:
    thread.start()

# This blocks the calling thread until the thread whose join() method is called is terminated.
# From http://docs.python.org/2/library/threading.html#thread-objects
for thread in thread_list:
    thread.join()

# Demonstrates that the main process waited for threads to complete
print "Done"

84
2017-09-23 16:07



La réponse d'Alex Martelli m'a aidé, mais voici une version modifiée que j'ai trouvé plus utile (du moins pour moi).

import Queue
import threading
import urllib2

worker_data = ['http://google.com', 'http://yahoo.com', 'http://bing.com']

#load up a queue with your data, this will handle locking
q = Queue.Queue()
for url in worker_data:
    q.put(url)

#define a worker function
def worker(queue):
    queue_full = True
    while queue_full:
        try:
            #get your data off the queue, and do some work
            url= queue.get(False)
            data = urllib2.urlopen(url).read()
            print len(data)

        except Queue.Empty:
            queue_full = False

#create as many threads as you want
thread_count = 5
for i in range(thread_count):
    t = threading.Thread(target=worker, args = (q,))
    t.start()

38
2017-10-01 15:50



J'ai trouvé cela très utile: créer autant de threads que de cœurs et les laisser exécuter un (grand) nombre de tâches (dans ce cas, appeler un programme shell):

import Queue
import threading
import multiprocessing
import subprocess

q = Queue.Queue()
for i in range(30): #put 30 tasks in the queue
    q.put(i)

def worker():
    while True:
        item = q.get()
        #execute a task: call a shell program and wait until it completes
        subprocess.call("echo "+str(item), shell=True) 
        q.task_done()

cpus=multiprocessing.cpu_count() #detect number of cores
print("Creating %d threads" % cpus)
for i in range(cpus):
     t = threading.Thread(target=worker)
     t.daemon = True
     t.start()

q.join() #block until all tasks are done

19
2018-06-06 23:51



Pour moi, l'exemple parfait pour Threading est la surveillance des événements asynchrones. Regardez ce code.

# thread_test.py
import threading
import time 

class Monitor(threading.Thread):
    def __init__(self, mon):
        threading.Thread.__init__(self)
        self.mon = mon

    def run(self):
        while True:
            if self.mon[0] == 2:
                print "Mon = 2"
                self.mon[0] = 3;

Vous pouvez jouer avec ce code en ouvrant une session IPython et en faisant quelque chose comme:

>>>from thread_test import Monitor
>>>a = [0]
>>>mon = Monitor(a)
>>>mon.start()
>>>a[0] = 2
Mon = 2
>>>a[0] = 2
Mon = 2

Attends quelques minutes

>>>a[0] = 2
Mon = 2

15
2018-04-14 04:18



Étant donné une fonction, f, enfilez comme ceci:

import threading
threading.Thread(target=f).start()

Pour transmettre des arguments à f

threading.Thread(target=f, args=(a,b,c)).start()

15
2018-03-16 16:07



Python 3 a la facilité de Lancement de tâches parallèles. Cela rend notre travail plus facile.

Il a pour pool de threads et Processus de mise en commun.

Ce qui suit donne un aperçu:

Exemple de ThreadPoolExecutor

import concurrent.futures
import urllib.request

URLS = ['http://www.foxnews.com/',
        'http://www.cnn.com/',
        'http://europe.wsj.com/',
        'http://www.bbc.co.uk/',
        'http://some-made-up-domain.com/']

# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()

# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    # Start the load operations and mark each future with its URL
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (url, exc))
        else:
            print('%r page is %d bytes' % (url, len(data)))

ProcessPoolExecutor

import concurrent.futures
import math

PRIMES = [
    112272535095293,
    112582705942171,
    112272535095293,
    115280095190773,
    115797848077099,
    1099726899285419]

def is_prime(n):
    if n % 2 == 0:
        return False

    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True

def main():
    with concurrent.futures.ProcessPoolExecutor() as executor:
        for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
            print('%d is prime: %s' % (number, prime))

if __name__ == '__main__':
    main()

14
2017-07-20 11:17



En utilisant le nouveau flamboyant concurrent.futures module

def sqr(val):
    import time
    time.sleep(0.1)
    return val * val

def process_result(result):
    print(result)

def process_these_asap(tasks):
    import concurrent.futures

    with concurrent.futures.ProcessPoolExecutor() as executor:
        futures = []
        for task in tasks:
            futures.append(executor.submit(sqr, task))

        for future in concurrent.futures.as_completed(futures):
            process_result(future.result())
        # Or instead of all this just do:
        # results = executor.map(sqr, tasks)
        # list(map(process_result, results))

def main():
    tasks = list(range(10))
    print('Processing {} tasks'.format(len(tasks)))
    process_these_asap(tasks)
    print('Done')
    return 0

if __name__ == '__main__':
    import sys
    sys.exit(main())

L'approche de l'exécuteur peut sembler familière à tous ceux qui ont déjà eu les mains sales avec Java.

Aussi sur une note de côté: Pour garder l'univers sain, n'oubliez pas de fermer vos pools / exécuteurs si vous n'utilisez pas with contexte (qui est tellement génial qu'il le fait pour vous)


13
2017-10-29 21:42