Question Python multiprocessing en toute sécurité écrit dans un fichier


J'essaie de résoudre un gros problème numérique qui implique beaucoup de sous-problèmes, et j'utilise le module multiprocesseur de Python (en particulier Pool.map) pour séparer différents sous-problèmes indépendants sur différents cœurs. Chaque sous-problème implique le calcul de nombreux sous-sous-problèmes, et j'essaie de mémoriser efficacement ces résultats en les stockant dans un fichier s'ils n'ont encore été calculés par aucun processus. Sinon, ignorez le calcul et lisez les résultats du fichier.

J'ai des problèmes de concomitance avec les fichiers: différents processus vérifient parfois si un sous-sous-problème a été calculé (en recherchant le fichier dans lequel les résultats seraient stockés), voyez qu'il ne l'a pas fait, exécutez le calcul, puis essayez d'écrire les résultats dans le même fichier en même temps. Comment puis-je éviter d'écrire des collisions comme celle-ci?


37
2017-11-19 01:13


origine


Réponses:


@ GP89 a mentionné une bonne solution. Utilisez une file d'attente pour envoyer les tâches d'écriture à un processus dédié disposant d'un accès en écriture exclusif au fichier. Tous les autres travailleurs ont un accès en lecture seule. Cela éliminera les collisions. Voici un exemple qui utilise apply_async, mais cela fonctionnera aussi avec map:

import multiprocessing as mp
import time

fn = 'c:/temp/temp.txt'

def worker(arg, q):
    '''stupidly simulates long running process'''
    start = time.clock()
    s = 'this is a test'
    txt = s
    for i in xrange(200000):
        txt += s 
    done = time.clock() - start
    with open(fn, 'rb') as f:
        size = len(f.read())
    res = 'Process' + str(arg), str(size), done
    q.put(res)
    return res

def listener(q):
    '''listens for messages on the q, writes to file. '''

    f = open(fn, 'wb') 
    while 1:
        m = q.get()
        if m == 'kill':
            f.write('killed')
            break
        f.write(str(m) + '\n')
        f.flush()
    f.close()

def main():
    #must use Manager queue here, or will not work
    manager = mp.Manager()
    q = manager.Queue()    
    pool = mp.Pool(mp.cpu_count() + 2)

    #put listener to work first
    watcher = pool.apply_async(listener, (q,))

    #fire off workers
    jobs = []
    for i in range(80):
        job = pool.apply_async(worker, (i, q))
        jobs.append(job)

    # collect results from the workers through the pool result queue
    for job in jobs: 
        job.get()

    #now we are done, kill the listener
    q.put('kill')
    pool.close()

if __name__ == "__main__":
   main()

71
2017-11-23 13:38



Il me semble que vous devez utiliser Manager pour enregistrer temporairement vos résultats dans une liste, puis écrire les résultats de la liste dans un fichier. En outre, utilisez starmap pour transmettre l'objet que vous souhaitez traiter et la liste gérée. La première étape consiste à créer le paramètre à transmettre à starmap, qui inclut la liste gérée.

from multiprocessing import Manager
from multiprocessing import Pool  
import pandas as pd```

def worker(row, param):
    # do something here and then append it to row
    x = param**2
    row.append(x)

if __name__ == '__main__':
    pool_parameter = [] # list of objects to process
    with Manager() as mgr:
        row = mgr.list([])

        # build list of parameters to send to starmap
        for param in pool_parameter:
            params.append([row,param])

        with Pool() as p:
            p.starmap(worker, params)

De ce point, vous devez décider comment vous allez gérer la liste. Si vous avez des tonnes de RAM et un énorme ensemble de données, n'hésitez pas à concaténer en utilisant des pandas. Ensuite, vous pouvez enregistrer le fichier très facilement comme un csv ou un cornichon.

        df = pd.concat(row, ignore_index=True)

        df.to_pickle('data.pickle')
        df.to_csv('data.csv')

0
2017-12-21 17:02