Question Lecture non bloquante sur un sous-processus.PIPE en python


J'utilise le module de sous-processus pour démarrer un sous-processus et se connecter à son flux de sortie (stdout). Je veux être capable d'exécuter des lectures non bloquantes sur sa sortie standard. Est-il possible de faire en sorte que .readline ne bloque pas ou de vérifier s'il y a des données sur le flux avant d'appeler .readline? Je voudrais que ce soit portable ou au moins fonctionne sous Windows et Linux.

Voici comment je le fais pour l'instant (Il bloque sur le .readline si aucune donnée n'est disponible):

p = subprocess.Popen('myprogram.exe', stdout = subprocess.PIPE)
output_str = p.stdout.readline()

417
2017-12-17 17:56


origine


Réponses:


fcntl, select, asyncproc ne va pas aider dans ce cas.

Un moyen fiable de lire un flux sans bloquer quel que soit le système d'exploitation est d'utiliser Queue.get_nowait():

import sys
from subprocess import PIPE, Popen
from threading  import Thread

try:
    from Queue import Queue, Empty
except ImportError:
    from queue import Queue, Empty  # python 3.x

ON_POSIX = 'posix' in sys.builtin_module_names

def enqueue_output(out, queue):
    for line in iter(out.readline, b''):
        queue.put(line)
    out.close()

p = Popen(['myprogram.exe'], stdout=PIPE, bufsize=1, close_fds=ON_POSIX)
q = Queue()
t = Thread(target=enqueue_output, args=(p.stdout, q))
t.daemon = True # thread dies with the program
t.start()

# ... do other things here

# read line without blocking
try:  line = q.get_nowait() # or q.get(timeout=.1)
except Empty:
    print('no output yet')
else: # got line
    # ... do something with line

346
2018-02-04 09:14



J'ai souvent eu un problème similaire; Les programmes Python que j'écris fréquemment doivent pouvoir exécuter certaines fonctionnalités primaires tout en acceptant simultanément les entrées utilisateur de la ligne de commande (stdin). Le simple fait de placer la fonctionnalité de gestion des entrées utilisateur dans un autre thread ne résout pas le problème car readline() blocs et n'a pas de délai d'expiration. Si la fonctionnalité principale est terminée et qu’il n’est plus nécessaire d’attendre d’autres entrées de la part de l’utilisateur, je souhaite généralement que mon programme se termine, mais ce n’est pas possible car readline() bloque toujours dans l'autre thread en attente d'une ligne. Une solution à ce problème est de rendre stdin un fichier non bloquant à l'aide du module fcntl:

import fcntl
import os
import sys

# make stdin a non-blocking file
fd = sys.stdin.fileno()
fl = fcntl.fcntl(fd, fcntl.F_GETFL)
fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)

# user input handling thread
while mainThreadIsRunning:
      try: input = sys.stdin.readline()
      except: continue
      handleInput(input)

A mon avis, c'est un peu plus propre que d'utiliser les modules select ou signal pour résoudre ce problème, mais encore une fois, cela ne fonctionne que sous UNIX ...


69
2017-11-27 21:33



Python 3.4 introduit de nouvelles API provisoire pour IO asynchrone - asyncio module.

L'approche est similaire à twistedréponse basée sur @Bryan Ward - définir un protocole et ses méthodes sont appelées dès que les données sont prêtes:

#!/usr/bin/env python3
import asyncio
import os

class SubprocessProtocol(asyncio.SubprocessProtocol):
    def pipe_data_received(self, fd, data):
        if fd == 1: # got stdout data (bytes)
            print(data)

    def connection_lost(self, exc):
        loop.stop() # end loop.run_forever()

if os.name == 'nt':
    loop = asyncio.ProactorEventLoop() # for subprocess' pipes on Windows
    asyncio.set_event_loop(loop)
else:
    loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(loop.subprocess_exec(SubprocessProtocol, 
        "myprogram.exe", "arg1", "arg2"))
    loop.run_forever()
finally:
    loop.close()

Voir "Sous-processus" dans les docs.

Il y a une interface de haut niveau asyncio.create_subprocess_exec() ça revient Process objets qui permet de lire une ligne asynchrone en utilisant StreamReader.readline() coroutine  (avec async/await Syntaxe Python 3.5+):

#!/usr/bin/env python3.5
import asyncio
import locale
import sys
from asyncio.subprocess import PIPE
from contextlib import closing

async def readline_and_kill(*args):
    # start child process
    process = await asyncio.create_subprocess_exec(*args, stdout=PIPE)

    # read line (sequence of bytes ending with b'\n') asynchronously
    async for line in process.stdout:
        print("got line:", line.decode(locale.getpreferredencoding(False)))
        break
    process.kill()
    return await process.wait() # wait for the child process to exit


if sys.platform == "win32":
    loop = asyncio.ProactorEventLoop()
    asyncio.set_event_loop(loop)
else:
    loop = asyncio.get_event_loop()

with closing(loop):
    sys.exit(loop.run_until_complete(readline_and_kill(
        "myprogram.exe", "arg1", "arg2")))

readline_and_kill() effectue les tâches suivantes:

  • démarrer le sous-processus, rediriger son stdout vers un tube
  • lire une ligne du sous-processus 'stdout asynchrone
  • tuer le sous-processus
  • attendez qu'il sorte

Chaque étape peut être limitée par des secondes d'expiration si nécessaire.


36
2017-12-20 05:50



Essaie le asyncproc module. Par exemple:

import os
from asyncproc import Process
myProc = Process("myprogram.app")

while True:
    # check to see if process has ended
    poll = myProc.wait(os.WNOHANG)
    if poll != None:
        break
    # print any new output
    out = myProc.read()
    if out != "":
        print out

Le module prend en charge tous les threads comme suggéré par S.Lott.


20
2018-01-13 03:36



Vous pouvez le faire très facilement Tordu. Selon votre base de code existante, cela peut ne pas être aussi facile à utiliser, mais si vous créez une application tordue, des choses comme celles-ci deviennent presque triviales. Vous créez un ProcessProtocol classe, et remplacer le outReceived() méthode. Twisted (en fonction du réacteur utilisé) est généralement juste un gros select() boucle avec des callbacks installés pour gérer les données de différents descripteurs de fichiers (souvent des sockets réseau). Alors le outReceived() la méthode consiste simplement à installer un rappel pour gérer les données provenant de STDOUT. Un exemple simple démontrant ce comportement est le suivant:

from twisted.internet import protocol, reactor

class MyProcessProtocol(protocol.ProcessProtocol):

    def outReceived(self, data):
        print data

proc = MyProcessProtocol()
reactor.spawnProcess(proc, './myprogram', ['./myprogram', 'arg1', 'arg2', 'arg3'])
reactor.run()

le Documentation tordue a de bonnes informations à ce sujet.

Si vous construisez votre application entière autour de Twisted, cela rend la communication asynchrone avec d'autres processus, locaux ou distants, très élégants comme ça. D'un autre côté, si votre programme n'est pas basé sur Twisted, cela ne sera pas vraiment utile. Espérons que cela peut être utile à d'autres lecteurs, même si cela ne s'applique pas à votre application particulière.


17
2018-04-21 21:54



Utilisez select & read (1).

import subprocess     #no new requirements
def readAllSoFar(proc, retVal=''): 
  while (select.select([proc.stdout],[],[],0)[0]!=[]):   
    retVal+=proc.stdout.read(1)
  return retVal
p = subprocess.Popen(['/bin/ls'], stdout=subprocess.PIPE)
while not p.poll():
  print (readAllSoFar(p))

Pour readline () - comme:

lines = ['']
while not p.poll():
  lines = readAllSoFar(p, lines[-1]).split('\n')
  for a in range(len(lines)-1):
    print a
lines = readAllSoFar(p, lines[-1]).split('\n')
for a in range(len(lines)-1):
  print a

16
2018-01-26 01:02



Une solution consiste à faire un autre processus pour effectuer votre lecture du processus, ou faire un thread du processus avec un délai d'expiration.

Voici la version threadée d'une fonction de délai d'attente:

http://code.activestate.com/recipes/473878/

Cependant, avez-vous besoin de lire le stdout quand il arrive? Une autre solution consiste à vider la sortie dans un fichier et à attendre que le processus se termine p.wait ().

f = open('myprogram_output.txt','w')
p = subprocess.Popen('myprogram.exe', stdout=f)
p.wait()
f.close()


str = open('myprogram_output.txt','r').read()

8
2017-12-17 18:16



Disclaimer: cela ne fonctionne que pour la tornade

Vous pouvez le faire en définissant fd comme non bloquant, puis utiliser ioloop pour enregistrer les rappels. Je l'ai emballé dans un œuf appelé tornado_subprocess et vous pouvez l'installer via PyPI:

easy_install tornado_subprocess

maintenant vous pouvez faire quelque chose comme ceci:

import tornado_subprocess
import tornado.ioloop

    def print_res( status, stdout, stderr ) :
    print status, stdout, stderr
    if status == 0:
        print "OK:"
        print stdout
    else:
        print "ERROR:"
        print stderr

t = tornado_subprocess.Subprocess( print_res, timeout=30, args=[ "cat", "/etc/passwd" ] )
t.start()
tornado.ioloop.IOLoop.instance().start()

vous pouvez également l'utiliser avec un RequestHandler

class MyHandler(tornado.web.RequestHandler):
    def on_done(self, status, stdout, stderr):
        self.write( stdout )
        self.finish()

    @tornado.web.asynchronous
    def get(self):
        t = tornado_subprocess.Subprocess( self.on_done, timeout=30, args=[ "cat", "/etc/passwd" ] )
        t.start()

7
2017-07-06 12:42



Les solutions existantes ne fonctionnaient pas pour moi (détails ci-dessous). Ce qui a finalement fonctionné était d'implémenter readline en utilisant read (1) (basé sur cette réponse). Ce dernier ne bloque pas:

from subprocess import Popen, PIPE
from threading import Thread
def process_output(myprocess): #output-consuming thread
    nextline = None
    buf = ''
    while True:
        #--- extract line using read(1)
        out = myprocess.stdout.read(1)
        if out == '' and myprocess.poll() != None: break
        if out != '':
            buf += out
            if out == '\n':
                nextline = buf
                buf = ''
        if not nextline: continue
        line = nextline
        nextline = None

        #--- do whatever you want with line here
        print 'Line is:', line
    myprocess.stdout.close()

myprocess = Popen('myprogram.exe', stdout=PIPE) #output-producing process
p1 = Thread(target=process_output, args=(dcmpid,)) #output-consuming thread
p1.daemon = True
p1.start()

#--- do whatever here and then kill process and thread if needed
if myprocess.poll() == None: #kill process; will automatically stop thread
    myprocess.kill()
    myprocess.wait()
if p1 and p1.is_alive(): #wait for thread to finish
    p1.join()

Pourquoi les solutions existantes ne fonctionnaient pas:

  1. Les solutions qui nécessitent readline (y compris celles basées sur la file d'attente) bloquent toujours. Il est difficile (impossible?) De tuer le thread qui exécute readline. Il n'est tué que lorsque le processus qui l'a créé se termine, mais pas lorsque le processus de production de sortie est supprimé.
  2. Le mélange de fcntl de bas niveau avec des appels readline de haut niveau peut ne pas fonctionner correctement, comme l'a souligné anonnn.
  3. L'utilisation de select.poll () est soignée, mais ne fonctionne pas sous Windows conformément à python docs.
  4. L'utilisation de bibliothèques tierces semble exagérée pour cette tâche et ajoute des dépendances supplémentaires.

6
2018-03-15 04:40