Date Tags python

Earlier today I was in the process of cleaning out some Chrome bookmarks when I came across a post by John M. Reese I bookmarked titled, Python: Using KeyboardInterrupt with a Multiprocessing Pool. I had bookmarked John’s post a number of months ago as it referenced my previous post, Python Multiprocessing and KeyboardInterrupt, however, not until today had I been able to look at his findings.

John suggests that by having the worker processes ignore SIGINT, the signal that results in python’s KeyboardInterrupt, the entire problem can be solved. Astute readers will note that I actually used the same approach in my second update to my aforementioned post, which suffered from the problem that intermediate results could not be processed, i.e., jobs that completed prior to the keyboard interrupt. While, John’s solution did educate me as to the existence of the initializer and initargs parameters to the multiprocessing.Pool function, his solution in-fact does not work. The only reason it appears to work is due to the time.sleep(10) in his try block. In most code this sleep call would not exist, rather the code would immediately call join() on the pool object.

In the absence of the delay introduced by the sleep call, John’s code still suffers from the original problem which is the KeyboardInterrupt exception does not reach the main process until all of the jobs have completed. The proper solution to the problem would be to fix the multiprocessing library to allow the join function to be interrupted. Until then, my suggestion of rolling your own pool functionality is the best solution I am aware of.

Below is a verbatim copy of my original solution for your convenience:

#!/usr/bin/env python
import multiprocessing, os, signal, time, Queue

def do_work():
    print 'Work Started: %d' % os.getpid()
    time.sleep(2)
    return 'Success'

def manual_function(job_queue, result_queue):
    signal.signal(signal.SIGINT, signal.SIG_IGN)
    while not job_queue.empty():
        try:
            job = job_queue.get(block=False)
            result_queue.put(do_work())
        except Queue.Empty:
            pass
        #except KeyboardInterrupt: pass

def main():
    job_queue = multiprocessing.Queue()
    result_queue = multiprocessing.Queue()

    for i in range(6):
        job_queue.put(None)

    workers = []
    for i in range(3):
        tmp = multiprocessing.Process(target=manual_function,
                                      args=(job_queue, result_queue))
        tmp.start()
        workers.append(tmp)

    try:
        for worker in workers:
            worker.join()
    except KeyboardInterrupt:
        print 'parent received ctrl-c'
        for worker in workers:
            worker.terminate()
            worker.join()

    while not result_queue.empty():
        print result_queue.get(block=False)

if __name__ == "__main__":
    main()

Comments

comments powered by Disqus