3 回答

TA贡献1834条经验 获得超8个赞
#!/usr/bin/env python"""Start process; wait 2 seconds; kill the process; print all process output."" "import subprocessimport tempfileimport timedef main(): # open temporary file (it automatically deleted when it is closed) # `Popen` requires `f.fileno()` so `SpooledTemporaryFile` adds nothing here f = tempfile.TemporaryFile() # start process, redirect stdout p = subprocess.Popen(["top"], stdout=f) # wait 2 seconds time.sleep(2) # kill process #NOTE: if it doesn't kill the process then `p.wait()` blocks forever p.terminate() p.wait() # wait for the process to terminate otherwise the output is garbled # print saved output f.seek(0) # rewind to the beginning of the file print f.read(), f.close()if __name__=="__main__": main()
只打印输出部分的类似尾部的解决方案
import collectionsimport subprocessimport timeimport threadingdef read_output(process, append): for line in iter(process.stdout.readline, ""): append(line)def main(): # start process, redirect stdout process = subprocess.Popen(["top"], stdout=subprocess.PIPE, close_fds=True) try: # save last `number_of_lines` lines of the process output number_of_lines = 200 q = collections.deque(maxlen=number_of_lines) # atomic .append() t = threading.Thread(target=read_output, args=(process, q.append)) t.daemon = True t.start() # time.sleep(2) finally: process.terminate() #NOTE: it doesn't ensure the process termination # print saved lines print ''.join(q)if __name__=="__main__": main()
q.append()
signal.alarm()
解
signal.alarm()
process.terminate()
subprocess
import collectionsimport signalimport subprocessclass Alarm(Exception): passdef alarm_handler(signum, frame): raise Alarmdef main(): # start process, redirect stdout process = subprocess.Popen(["top"], stdout=subprocess.PIPE, close_fds=True) # set signal handler signal.signal(signal.SIGALRM, alarm_handler) signal.alarm(2) # produce SIGALRM in 2 seconds try: # save last `number_of_lines` lines of the process output number_of_lines = 200 q = collections.deque(maxlen=number_of_lines) for line in iter(process.stdout.readline, ""): q.append(line) signal.alarm(0) # cancel alarm except Alarm: process.terminate() finally: # print saved lines print ''.join(q)if __name__=="__main__": main()
process.stdout.readline()
threading.Timer
解
import collectionsimport subprocessimport threadingdef main(): # start process, redirect stdout process = subprocess.Popen(["top"], stdout=subprocess.PIPE, close_fds=True) # terminate process in timeout seconds timeout = 2 # seconds timer = threading.Timer(timeout, process.terminate) timer.start() # save last `number_of_lines` lines of the process output number_of_lines = 200 q = collections.deque(process.stdout, maxlen=number_of_lines) timer.cancel() # print saved lines print ''.join(q),if __name__=="__main__": main()
process.stdout
iter(process.stdout.readline, "")
process.terminate()
没有线程,没有信号解决方案
import collectionsimport subprocessimport sysimport timedef main(): args = sys.argv[1:] if not args: args = ['top'] # start process, redirect stdout process = subprocess.Popen(args, stdout=subprocess.PIPE, close_fds=True) # save last `number_of_lines` lines of the process output number_of_lines = 200 q = collections.deque(maxlen=number_of_lines) timeout = 2 # seconds now = start = time.time() while (now - start) < timeout: line = process.stdout.readline() if not line: break q.append(line) now = time.time() else: # on timeout process.terminate() # print saved lines print ''.join(q),if __name__=="__main__": main()
process.stdout.readline()

TA贡献1875条经验 获得超5个赞

TA贡献1869条经验 获得超4个赞
top >> tmp_file
cat tmp_file
import osimport subprocessimport time subprocess.Popen("top >> tmp_file",shell = True)time.sleep(1)os.popen("killall top") process = os.popen("cat tmp_file").read()os.popen("rm tmp_file")print process# Thing better than nothing =)
添加回答
举报