From 548d5fca3e1998d975ac128208829caec373888b Mon Sep 17 00:00:00 2001 From: Batuhan Taskaya Date: Sun, 15 Mar 2020 05:43:27 +0300 Subject: [PATCH 1/4] bpo-39360: Ensure all workers exit when finalizing a multiprocessing.Pool --- Lib/multiprocessing/pool.py | 4 ++-- Lib/test/_test_multiprocessing.py | 15 +++++++++++++++ .../2020-03-15-05-41-05.bpo-39360.cmcU5p.rst | 2 ++ 3 files changed, 19 insertions(+), 2 deletions(-) create mode 100644 Misc/NEWS.d/next/Library/2020-03-15-05-41-05.bpo-39360.cmcU5p.rst diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py index b223d6aa724bb6..55cdb609a5558c 100644 --- a/Lib/multiprocessing/pool.py +++ b/Lib/multiprocessing/pool.py @@ -651,8 +651,6 @@ def close(self): def terminate(self): util.debug('terminating pool') self._state = TERMINATE - self._worker_handler._state = TERMINATE - self._change_notifier.put(None) self._terminate() def join(self): @@ -683,6 +681,8 @@ def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool, change_notifier, util.debug('finalizing pool') worker_handler._state = TERMINATE + change_notifier.put(None) + task_handler._state = TERMINATE util.debug('helping task handler/workers to finish') diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index b985d51508cb75..e8c9227216c4d6 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -2780,6 +2780,21 @@ def test_pool_worker_lifetime_early_close(self): for (j, res) in enumerate(results): self.assertEqual(res.get(), sqr(j)) + def test_pool_hang(self): + # tests cases against bpo-38744 and bpo-39360 + cmd = '''if 1: + from multiprocessing import Pool + class A: + def init(self): + self.pool = Pool(processes=1) + def do_something(x): + return x1 + problem = A() + problem.pool.map(do_something, [1,2,3]) + ''' + rc, out, err = test.support.script_helper.assert_python_ok('-c', cmd) + self.assertEqual(rc, 0) + # # Test of creating a customized manager class # diff --git a/Misc/NEWS.d/next/Library/2020-03-15-05-41-05.bpo-39360.cmcU5p.rst b/Misc/NEWS.d/next/Library/2020-03-15-05-41-05.bpo-39360.cmcU5p.rst new file mode 100644 index 00000000000000..85afe726dd20fe --- /dev/null +++ b/Misc/NEWS.d/next/Library/2020-03-15-05-41-05.bpo-39360.cmcU5p.rst @@ -0,0 +1,2 @@ +Ensure all workers exit when finalizing a :class:`multiprocessing.Pool`. +Patch by Batuhan Taskaya. From 207d611c9f832956b7a990108849ef3dc04bbcc9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Batuhan=20Ta=C5=9Fkaya?= <47358913+isidentical@users.noreply.github.com> Date: Sun, 15 Mar 2020 06:02:12 +0300 Subject: [PATCH 2/4] Update Misc/NEWS.d/next/Library/2020-03-15-05-41-05.bpo-39360.cmcU5p.rst Co-Authored-By: Pablo Galindo --- Lib/test/_test_multiprocessing.py | 4 ++-- .../next/Library/2020-03-15-05-41-05.bpo-39360.cmcU5p.rst | 5 +++-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index e8c9227216c4d6..3c0d3c030bf3f4 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -2785,10 +2785,10 @@ def test_pool_hang(self): cmd = '''if 1: from multiprocessing import Pool class A: - def init(self): + def __init__(self): self.pool = Pool(processes=1) def do_something(x): - return x1 + return x + 1 problem = A() problem.pool.map(do_something, [1,2,3]) ''' diff --git a/Misc/NEWS.d/next/Library/2020-03-15-05-41-05.bpo-39360.cmcU5p.rst b/Misc/NEWS.d/next/Library/2020-03-15-05-41-05.bpo-39360.cmcU5p.rst index 85afe726dd20fe..86b96012498f6d 100644 --- a/Misc/NEWS.d/next/Library/2020-03-15-05-41-05.bpo-39360.cmcU5p.rst +++ b/Misc/NEWS.d/next/Library/2020-03-15-05-41-05.bpo-39360.cmcU5p.rst @@ -1,2 +1,3 @@ -Ensure all workers exit when finalizing a :class:`multiprocessing.Pool`. -Patch by Batuhan Taskaya. +Ensure all workers exit when finalizing a :class:`multiprocessing.Pool` implicitly via the module finalization +handlers of multiprocessing. This fixes a deadlock situation that can be experienced when the Pool is not +properly finalized via the context manager or a call to ``multiprocessing.Pool.terminate``. Patch by Batuhan Taskaya. From 9f17cba72cb4e28aff254a7d44225f88ef873f2b Mon Sep 17 00:00:00 2001 From: Batuhan Taskaya Date: Sun, 15 Mar 2020 14:09:48 +0300 Subject: [PATCH 3/4] some changes --- Lib/multiprocessing/pool.py | 12 ++++++++++-- Lib/test/_test_multiprocessing.py | 2 +- .../Library/2020-03-15-05-41-05.bpo-39360.cmcU5p.rst | 3 ++- 3 files changed, 13 insertions(+), 4 deletions(-) diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py index 55cdb609a5558c..8bd9608b0e7dd7 100644 --- a/Lib/multiprocessing/pool.py +++ b/Lib/multiprocessing/pool.py @@ -651,6 +651,8 @@ def close(self): def terminate(self): util.debug('terminating pool') self._state = TERMINATE + self._worker_handler._state = TERMINATE + self._change_notifier.put(None) self._terminate() def join(self): @@ -680,8 +682,14 @@ def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool, change_notifier, # this is guaranteed to only be called once util.debug('finalizing pool') - worker_handler._state = TERMINATE - change_notifier.put(None) + # Explicitly do the cleanup here if it didn't come from terminate() + # (required for if the queue will block, if it is already closed) + if worker_handler._state != TERMINATE: + # Notify that the worker_handler state has been changed so the + # _handle_workers loop can be unblocked (and exited) in order to + # send the finalization sentinel all the workers. + worker_handler._state = TERMINATE + change_notifier.put(None) task_handler._state = TERMINATE diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index 3c0d3c030bf3f4..028d12112154ef 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -2780,7 +2780,7 @@ def test_pool_worker_lifetime_early_close(self): for (j, res) in enumerate(results): self.assertEqual(res.get(), sqr(j)) - def test_pool_hang(self): + def test_worker_finalization_via_atexit_handler_of_multiprocessing(self): # tests cases against bpo-38744 and bpo-39360 cmd = '''if 1: from multiprocessing import Pool diff --git a/Misc/NEWS.d/next/Library/2020-03-15-05-41-05.bpo-39360.cmcU5p.rst b/Misc/NEWS.d/next/Library/2020-03-15-05-41-05.bpo-39360.cmcU5p.rst index 86b96012498f6d..148878886e6ee5 100644 --- a/Misc/NEWS.d/next/Library/2020-03-15-05-41-05.bpo-39360.cmcU5p.rst +++ b/Misc/NEWS.d/next/Library/2020-03-15-05-41-05.bpo-39360.cmcU5p.rst @@ -1,3 +1,4 @@ Ensure all workers exit when finalizing a :class:`multiprocessing.Pool` implicitly via the module finalization handlers of multiprocessing. This fixes a deadlock situation that can be experienced when the Pool is not -properly finalized via the context manager or a call to ``multiprocessing.Pool.terminate``. Patch by Batuhan Taskaya. +properly finalized via the context manager or a call to ``multiprocessing.Pool.terminate``. Patch by Batuhan Taskaya +and Pablo Galindo. From 0d008712438f310ac001d320e2d884ee794115e4 Mon Sep 17 00:00:00 2001 From: Pablo Galindo Date: Sun, 15 Mar 2020 18:01:41 +0000 Subject: [PATCH 4/4] Make it work with spawn --- Lib/multiprocessing/pool.py | 15 +++++---------- Lib/test/_test_multiprocessing.py | 11 +++++++---- 2 files changed, 12 insertions(+), 14 deletions(-) diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py index 8bd9608b0e7dd7..41dd923d4f9740 100644 --- a/Lib/multiprocessing/pool.py +++ b/Lib/multiprocessing/pool.py @@ -651,8 +651,6 @@ def close(self): def terminate(self): util.debug('terminating pool') self._state = TERMINATE - self._worker_handler._state = TERMINATE - self._change_notifier.put(None) self._terminate() def join(self): @@ -682,14 +680,11 @@ def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool, change_notifier, # this is guaranteed to only be called once util.debug('finalizing pool') - # Explicitly do the cleanup here if it didn't come from terminate() - # (required for if the queue will block, if it is already closed) - if worker_handler._state != TERMINATE: - # Notify that the worker_handler state has been changed so the - # _handle_workers loop can be unblocked (and exited) in order to - # send the finalization sentinel all the workers. - worker_handler._state = TERMINATE - change_notifier.put(None) + # Notify that the worker_handler state has been changed so the + # _handle_workers loop can be unblocked (and exited) in order to + # send the finalization sentinel all the workers. + worker_handler._state = TERMINATE + change_notifier.put(None) task_handler._state = TERMINATE diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index 028d12112154ef..4a87b1761f9efe 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -2784,13 +2784,16 @@ def test_worker_finalization_via_atexit_handler_of_multiprocessing(self): # tests cases against bpo-38744 and bpo-39360 cmd = '''if 1: from multiprocessing import Pool + problem = None class A: def __init__(self): self.pool = Pool(processes=1) - def do_something(x): - return x + 1 - problem = A() - problem.pool.map(do_something, [1,2,3]) + def test(): + global problem + problem = A() + problem.pool.map(float, tuple(range(10))) + if __name__ == "__main__": + test() ''' rc, out, err = test.support.script_helper.assert_python_ok('-c', cmd) self.assertEqual(rc, 0)