Using nested multiprocessing pools in python
up vote
0
down vote
favorite
I'm processing image files, which are represented as 2d numpy
arrays in python, and I need to iterate over the row of each array to apply a function.
This is a follow-up question to a previous question of mine, which has more details about my processing chain.
But here's the skinny:
The image files are independently stored on disk. I read each file into memory and the use a multiprocessing pool to apply the function over each row.
Now, I wanted to additionally speed up processing multiple files, by also using a pool for the images themselves. So the end result would be a pool of workers each processing an image, and each worker again starting a pool of workers processing the individual image.
I quickly ran into issues because daemonic processes are not allowed to have children, but I did find a nice workaround on StackOverflow.
I've adapted the code from my previous question to use this "nested" pool setup, running 10 images in parallel using process_image
:
import numpy as np
import ctypes
import array
import multiprocessing
import multiprocessing.pool
import random
from contextlib import contextmanager, closing
def init_shared(ncell):
'''Create shared value array for processing.'''
shared_array_base = multiprocessing.Array(ctypes.c_float,ncell,lock=False)
return(shared_array_base)
def tonumpyarray(shared_array):
'''Create numpy array from shared memory.'''
nparray= np.frombuffer(shared_array,dtype=ctypes.c_float)
assert nparray.base is shared_array
return nparray
def init_parameters(**kwargs):
'''Initialize parameters for processing in workers.'''
params = dict()
for key, value in kwargs.items():
params[key] = value
return params
def init_worker(shared_array_,parameters_):
'''Initialize worker for processing.
Args:
shared_array_: Object returned by init_shared
parameters_: Dictionary returned by init_parameters
'''
global arr
global parr
global dim
global xfact
arr = tonumpyarray(shared_array_)
arr.shape = parameters_['dimensions']
xfact = parameters_['x']
parr = tonumpyarray(parameters_['shared_parr'])
def worker_fun(ix):
'''Function to be run inside each worker'''
parr[ix] = ix * xfact
arr[ix,...] = arr[ix,...] * parr[ix]
def process_image(x):
i = Image(100,10,x)
class NoDaemonProcess(multiprocessing.Process):
# make 'daemon' attribute always return False
def _get_daemon(self):
return False
def _set_daemon(self, value):
pass
daemon = property(_get_daemon, _set_daemon)
# We sub-class multiprocessing.pool.Pool instead of multiprocessing.Pool
# because the latter is only a wrapper function, not a proper class.
class Pool(multiprocessing.pool.Pool):
Process = NoDaemonProcess
##----------------------------------------------------------------------
class Image:
def __init__(self,nrows,ncols,x):
shared_array = init_shared(nrows*ncols)
shared_parr = init_shared(nrows)
params = init_parameters(shared_parr=shared_parr,dimensions=(nrows,ncols),x=x)
arr = tonumpyarray(shared_array)
parr = tonumpyarray(params['shared_parr'])
arr.shape = (nrows,ncols)
arr[...] = np.random.randint(1,100,size=(100,10),dtype='int16')
with closing(Pool(processes=2,initializer = init_worker, initargs = (shared_array,params))) as pool:
res = pool.map(worker_fun,range(arr.shape[0]))
pool.close()
pool.join()
# check PARR output
print(parr)
def main():
with closing(Pool(processes=4)) as pool:
res = pool.map(process_image,range(10))
if __name__ == '__main__':
multiprocessing.freeze_support()
main()
Output:
[ 0. 1. 2. 3. 4. 5. 6. 7. 8. 9. 10. 11. 12. 13. 14. 15. 16.
17.
18. 19. 20. 21. 22. 23. 24. 25. 26. 27. 28. 29. 30. 31. 32. 33. 34. 35.
36. 37. 38. 39. 40. 41. 42. 43. 44. 45. 46. 47. 48. 49. 50. 51. 52. 53.
54. 55. 56. 57. 58. 59. 60. 61. 62. 63. 64. 65. 66. 67. 68. 69. 70. 71.
72. 73. 74. 75. 76. 77. 78. 79. 80. 81. 82. 83. 84. 85. 86. 87. 88. 89.
90. 91. 92. 93. 94. 95. 96. 97. 98. 99.]
[ 0. 3. 6. 9. 12. 15. 18. 21. 24. 27. 30. 33. 36. 39.
42. 45. 48. 51. 54. 57. 60. 63. 66. 69. 72. 75. 78. 81.
84. 87. 90. 93. 96. 99. 102. 105. 108. 111. 114. 117. 120. 123.
126. 129. 132. 135. 138. 141. 144. 147. 150. 153. 156. 159. 162. 165.
168. 171. 174. 177. 180. 183. 186. 189. 192. 195. 198. 201. 204. 207.
210. 213. 216. 219. 222. 225. 228. 231. 234. 237. 240. 243. 246. 249.
252. 255. 258. 261. 264. 267. 270. 273. 276. 279. 282. 285. 288. 291.
294. 297.]
...
The questions:
I know that by default you can't create these nested pools, because there can be issues with zombie processes.
But given that the approach seems to be working and also the embarrassingly parallel nature of my problem, I wondering:
- Is parallelization like this a valid approach?
- Is there maybe a different, better approach to this issue?
- Are there other caveats to this approach which I haven't seen?
Thanks for your input.
python array concurrency multiprocessing
add a comment |
up vote
0
down vote
favorite
I'm processing image files, which are represented as 2d numpy
arrays in python, and I need to iterate over the row of each array to apply a function.
This is a follow-up question to a previous question of mine, which has more details about my processing chain.
But here's the skinny:
The image files are independently stored on disk. I read each file into memory and the use a multiprocessing pool to apply the function over each row.
Now, I wanted to additionally speed up processing multiple files, by also using a pool for the images themselves. So the end result would be a pool of workers each processing an image, and each worker again starting a pool of workers processing the individual image.
I quickly ran into issues because daemonic processes are not allowed to have children, but I did find a nice workaround on StackOverflow.
I've adapted the code from my previous question to use this "nested" pool setup, running 10 images in parallel using process_image
:
import numpy as np
import ctypes
import array
import multiprocessing
import multiprocessing.pool
import random
from contextlib import contextmanager, closing
def init_shared(ncell):
'''Create shared value array for processing.'''
shared_array_base = multiprocessing.Array(ctypes.c_float,ncell,lock=False)
return(shared_array_base)
def tonumpyarray(shared_array):
'''Create numpy array from shared memory.'''
nparray= np.frombuffer(shared_array,dtype=ctypes.c_float)
assert nparray.base is shared_array
return nparray
def init_parameters(**kwargs):
'''Initialize parameters for processing in workers.'''
params = dict()
for key, value in kwargs.items():
params[key] = value
return params
def init_worker(shared_array_,parameters_):
'''Initialize worker for processing.
Args:
shared_array_: Object returned by init_shared
parameters_: Dictionary returned by init_parameters
'''
global arr
global parr
global dim
global xfact
arr = tonumpyarray(shared_array_)
arr.shape = parameters_['dimensions']
xfact = parameters_['x']
parr = tonumpyarray(parameters_['shared_parr'])
def worker_fun(ix):
'''Function to be run inside each worker'''
parr[ix] = ix * xfact
arr[ix,...] = arr[ix,...] * parr[ix]
def process_image(x):
i = Image(100,10,x)
class NoDaemonProcess(multiprocessing.Process):
# make 'daemon' attribute always return False
def _get_daemon(self):
return False
def _set_daemon(self, value):
pass
daemon = property(_get_daemon, _set_daemon)
# We sub-class multiprocessing.pool.Pool instead of multiprocessing.Pool
# because the latter is only a wrapper function, not a proper class.
class Pool(multiprocessing.pool.Pool):
Process = NoDaemonProcess
##----------------------------------------------------------------------
class Image:
def __init__(self,nrows,ncols,x):
shared_array = init_shared(nrows*ncols)
shared_parr = init_shared(nrows)
params = init_parameters(shared_parr=shared_parr,dimensions=(nrows,ncols),x=x)
arr = tonumpyarray(shared_array)
parr = tonumpyarray(params['shared_parr'])
arr.shape = (nrows,ncols)
arr[...] = np.random.randint(1,100,size=(100,10),dtype='int16')
with closing(Pool(processes=2,initializer = init_worker, initargs = (shared_array,params))) as pool:
res = pool.map(worker_fun,range(arr.shape[0]))
pool.close()
pool.join()
# check PARR output
print(parr)
def main():
with closing(Pool(processes=4)) as pool:
res = pool.map(process_image,range(10))
if __name__ == '__main__':
multiprocessing.freeze_support()
main()
Output:
[ 0. 1. 2. 3. 4. 5. 6. 7. 8. 9. 10. 11. 12. 13. 14. 15. 16.
17.
18. 19. 20. 21. 22. 23. 24. 25. 26. 27. 28. 29. 30. 31. 32. 33. 34. 35.
36. 37. 38. 39. 40. 41. 42. 43. 44. 45. 46. 47. 48. 49. 50. 51. 52. 53.
54. 55. 56. 57. 58. 59. 60. 61. 62. 63. 64. 65. 66. 67. 68. 69. 70. 71.
72. 73. 74. 75. 76. 77. 78. 79. 80. 81. 82. 83. 84. 85. 86. 87. 88. 89.
90. 91. 92. 93. 94. 95. 96. 97. 98. 99.]
[ 0. 3. 6. 9. 12. 15. 18. 21. 24. 27. 30. 33. 36. 39.
42. 45. 48. 51. 54. 57. 60. 63. 66. 69. 72. 75. 78. 81.
84. 87. 90. 93. 96. 99. 102. 105. 108. 111. 114. 117. 120. 123.
126. 129. 132. 135. 138. 141. 144. 147. 150. 153. 156. 159. 162. 165.
168. 171. 174. 177. 180. 183. 186. 189. 192. 195. 198. 201. 204. 207.
210. 213. 216. 219. 222. 225. 228. 231. 234. 237. 240. 243. 246. 249.
252. 255. 258. 261. 264. 267. 270. 273. 276. 279. 282. 285. 288. 291.
294. 297.]
...
The questions:
I know that by default you can't create these nested pools, because there can be issues with zombie processes.
But given that the approach seems to be working and also the embarrassingly parallel nature of my problem, I wondering:
- Is parallelization like this a valid approach?
- Is there maybe a different, better approach to this issue?
- Are there other caveats to this approach which I haven't seen?
Thanks for your input.
python array concurrency multiprocessing
add a comment |
up vote
0
down vote
favorite
up vote
0
down vote
favorite
I'm processing image files, which are represented as 2d numpy
arrays in python, and I need to iterate over the row of each array to apply a function.
This is a follow-up question to a previous question of mine, which has more details about my processing chain.
But here's the skinny:
The image files are independently stored on disk. I read each file into memory and the use a multiprocessing pool to apply the function over each row.
Now, I wanted to additionally speed up processing multiple files, by also using a pool for the images themselves. So the end result would be a pool of workers each processing an image, and each worker again starting a pool of workers processing the individual image.
I quickly ran into issues because daemonic processes are not allowed to have children, but I did find a nice workaround on StackOverflow.
I've adapted the code from my previous question to use this "nested" pool setup, running 10 images in parallel using process_image
:
import numpy as np
import ctypes
import array
import multiprocessing
import multiprocessing.pool
import random
from contextlib import contextmanager, closing
def init_shared(ncell):
'''Create shared value array for processing.'''
shared_array_base = multiprocessing.Array(ctypes.c_float,ncell,lock=False)
return(shared_array_base)
def tonumpyarray(shared_array):
'''Create numpy array from shared memory.'''
nparray= np.frombuffer(shared_array,dtype=ctypes.c_float)
assert nparray.base is shared_array
return nparray
def init_parameters(**kwargs):
'''Initialize parameters for processing in workers.'''
params = dict()
for key, value in kwargs.items():
params[key] = value
return params
def init_worker(shared_array_,parameters_):
'''Initialize worker for processing.
Args:
shared_array_: Object returned by init_shared
parameters_: Dictionary returned by init_parameters
'''
global arr
global parr
global dim
global xfact
arr = tonumpyarray(shared_array_)
arr.shape = parameters_['dimensions']
xfact = parameters_['x']
parr = tonumpyarray(parameters_['shared_parr'])
def worker_fun(ix):
'''Function to be run inside each worker'''
parr[ix] = ix * xfact
arr[ix,...] = arr[ix,...] * parr[ix]
def process_image(x):
i = Image(100,10,x)
class NoDaemonProcess(multiprocessing.Process):
# make 'daemon' attribute always return False
def _get_daemon(self):
return False
def _set_daemon(self, value):
pass
daemon = property(_get_daemon, _set_daemon)
# We sub-class multiprocessing.pool.Pool instead of multiprocessing.Pool
# because the latter is only a wrapper function, not a proper class.
class Pool(multiprocessing.pool.Pool):
Process = NoDaemonProcess
##----------------------------------------------------------------------
class Image:
def __init__(self,nrows,ncols,x):
shared_array = init_shared(nrows*ncols)
shared_parr = init_shared(nrows)
params = init_parameters(shared_parr=shared_parr,dimensions=(nrows,ncols),x=x)
arr = tonumpyarray(shared_array)
parr = tonumpyarray(params['shared_parr'])
arr.shape = (nrows,ncols)
arr[...] = np.random.randint(1,100,size=(100,10),dtype='int16')
with closing(Pool(processes=2,initializer = init_worker, initargs = (shared_array,params))) as pool:
res = pool.map(worker_fun,range(arr.shape[0]))
pool.close()
pool.join()
# check PARR output
print(parr)
def main():
with closing(Pool(processes=4)) as pool:
res = pool.map(process_image,range(10))
if __name__ == '__main__':
multiprocessing.freeze_support()
main()
Output:
[ 0. 1. 2. 3. 4. 5. 6. 7. 8. 9. 10. 11. 12. 13. 14. 15. 16.
17.
18. 19. 20. 21. 22. 23. 24. 25. 26. 27. 28. 29. 30. 31. 32. 33. 34. 35.
36. 37. 38. 39. 40. 41. 42. 43. 44. 45. 46. 47. 48. 49. 50. 51. 52. 53.
54. 55. 56. 57. 58. 59. 60. 61. 62. 63. 64. 65. 66. 67. 68. 69. 70. 71.
72. 73. 74. 75. 76. 77. 78. 79. 80. 81. 82. 83. 84. 85. 86. 87. 88. 89.
90. 91. 92. 93. 94. 95. 96. 97. 98. 99.]
[ 0. 3. 6. 9. 12. 15. 18. 21. 24. 27. 30. 33. 36. 39.
42. 45. 48. 51. 54. 57. 60. 63. 66. 69. 72. 75. 78. 81.
84. 87. 90. 93. 96. 99. 102. 105. 108. 111. 114. 117. 120. 123.
126. 129. 132. 135. 138. 141. 144. 147. 150. 153. 156. 159. 162. 165.
168. 171. 174. 177. 180. 183. 186. 189. 192. 195. 198. 201. 204. 207.
210. 213. 216. 219. 222. 225. 228. 231. 234. 237. 240. 243. 246. 249.
252. 255. 258. 261. 264. 267. 270. 273. 276. 279. 282. 285. 288. 291.
294. 297.]
...
The questions:
I know that by default you can't create these nested pools, because there can be issues with zombie processes.
But given that the approach seems to be working and also the embarrassingly parallel nature of my problem, I wondering:
- Is parallelization like this a valid approach?
- Is there maybe a different, better approach to this issue?
- Are there other caveats to this approach which I haven't seen?
Thanks for your input.
python array concurrency multiprocessing
I'm processing image files, which are represented as 2d numpy
arrays in python, and I need to iterate over the row of each array to apply a function.
This is a follow-up question to a previous question of mine, which has more details about my processing chain.
But here's the skinny:
The image files are independently stored on disk. I read each file into memory and the use a multiprocessing pool to apply the function over each row.
Now, I wanted to additionally speed up processing multiple files, by also using a pool for the images themselves. So the end result would be a pool of workers each processing an image, and each worker again starting a pool of workers processing the individual image.
I quickly ran into issues because daemonic processes are not allowed to have children, but I did find a nice workaround on StackOverflow.
I've adapted the code from my previous question to use this "nested" pool setup, running 10 images in parallel using process_image
:
import numpy as np
import ctypes
import array
import multiprocessing
import multiprocessing.pool
import random
from contextlib import contextmanager, closing
def init_shared(ncell):
'''Create shared value array for processing.'''
shared_array_base = multiprocessing.Array(ctypes.c_float,ncell,lock=False)
return(shared_array_base)
def tonumpyarray(shared_array):
'''Create numpy array from shared memory.'''
nparray= np.frombuffer(shared_array,dtype=ctypes.c_float)
assert nparray.base is shared_array
return nparray
def init_parameters(**kwargs):
'''Initialize parameters for processing in workers.'''
params = dict()
for key, value in kwargs.items():
params[key] = value
return params
def init_worker(shared_array_,parameters_):
'''Initialize worker for processing.
Args:
shared_array_: Object returned by init_shared
parameters_: Dictionary returned by init_parameters
'''
global arr
global parr
global dim
global xfact
arr = tonumpyarray(shared_array_)
arr.shape = parameters_['dimensions']
xfact = parameters_['x']
parr = tonumpyarray(parameters_['shared_parr'])
def worker_fun(ix):
'''Function to be run inside each worker'''
parr[ix] = ix * xfact
arr[ix,...] = arr[ix,...] * parr[ix]
def process_image(x):
i = Image(100,10,x)
class NoDaemonProcess(multiprocessing.Process):
# make 'daemon' attribute always return False
def _get_daemon(self):
return False
def _set_daemon(self, value):
pass
daemon = property(_get_daemon, _set_daemon)
# We sub-class multiprocessing.pool.Pool instead of multiprocessing.Pool
# because the latter is only a wrapper function, not a proper class.
class Pool(multiprocessing.pool.Pool):
Process = NoDaemonProcess
##----------------------------------------------------------------------
class Image:
def __init__(self,nrows,ncols,x):
shared_array = init_shared(nrows*ncols)
shared_parr = init_shared(nrows)
params = init_parameters(shared_parr=shared_parr,dimensions=(nrows,ncols),x=x)
arr = tonumpyarray(shared_array)
parr = tonumpyarray(params['shared_parr'])
arr.shape = (nrows,ncols)
arr[...] = np.random.randint(1,100,size=(100,10),dtype='int16')
with closing(Pool(processes=2,initializer = init_worker, initargs = (shared_array,params))) as pool:
res = pool.map(worker_fun,range(arr.shape[0]))
pool.close()
pool.join()
# check PARR output
print(parr)
def main():
with closing(Pool(processes=4)) as pool:
res = pool.map(process_image,range(10))
if __name__ == '__main__':
multiprocessing.freeze_support()
main()
Output:
[ 0. 1. 2. 3. 4. 5. 6. 7. 8. 9. 10. 11. 12. 13. 14. 15. 16.
17.
18. 19. 20. 21. 22. 23. 24. 25. 26. 27. 28. 29. 30. 31. 32. 33. 34. 35.
36. 37. 38. 39. 40. 41. 42. 43. 44. 45. 46. 47. 48. 49. 50. 51. 52. 53.
54. 55. 56. 57. 58. 59. 60. 61. 62. 63. 64. 65. 66. 67. 68. 69. 70. 71.
72. 73. 74. 75. 76. 77. 78. 79. 80. 81. 82. 83. 84. 85. 86. 87. 88. 89.
90. 91. 92. 93. 94. 95. 96. 97. 98. 99.]
[ 0. 3. 6. 9. 12. 15. 18. 21. 24. 27. 30. 33. 36. 39.
42. 45. 48. 51. 54. 57. 60. 63. 66. 69. 72. 75. 78. 81.
84. 87. 90. 93. 96. 99. 102. 105. 108. 111. 114. 117. 120. 123.
126. 129. 132. 135. 138. 141. 144. 147. 150. 153. 156. 159. 162. 165.
168. 171. 174. 177. 180. 183. 186. 189. 192. 195. 198. 201. 204. 207.
210. 213. 216. 219. 222. 225. 228. 231. 234. 237. 240. 243. 246. 249.
252. 255. 258. 261. 264. 267. 270. 273. 276. 279. 282. 285. 288. 291.
294. 297.]
...
The questions:
I know that by default you can't create these nested pools, because there can be issues with zombie processes.
But given that the approach seems to be working and also the embarrassingly parallel nature of my problem, I wondering:
- Is parallelization like this a valid approach?
- Is there maybe a different, better approach to this issue?
- Are there other caveats to this approach which I haven't seen?
Thanks for your input.
python array concurrency multiprocessing
python array concurrency multiprocessing
asked Nov 13 at 13:47
Val
1085
1085
add a comment |
add a comment |
active
oldest
votes
active
oldest
votes
active
oldest
votes
active
oldest
votes
active
oldest
votes
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
StackExchange.ready(
function () {
StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fcodereview.stackexchange.com%2fquestions%2f207556%2fusing-nested-multiprocessing-pools-in-python%23new-answer', 'question_page');
}
);
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Sign up or log in
StackExchange.ready(function () {
StackExchange.helpers.onClickDraftSave('#login-link');
});
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Sign up using Google
Sign up using Facebook
Sign up using Email and Password
Post as a guest
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown
Required, but never shown