Using nested multiprocessing pools in python











up vote
0
down vote

favorite












I'm processing image files, which are represented as 2d numpy arrays in python, and I need to iterate over the row of each array to apply a function.



This is a follow-up question to a previous question of mine, which has more details about my processing chain.



But here's the skinny:



The image files are independently stored on disk. I read each file into memory and the use a multiprocessing pool to apply the function over each row.



Now, I wanted to additionally speed up processing multiple files, by also using a pool for the images themselves. So the end result would be a pool of workers each processing an image, and each worker again starting a pool of workers processing the individual image.



I quickly ran into issues because daemonic processes are not allowed to have children, but I did find a nice workaround on StackOverflow.



I've adapted the code from my previous question to use this "nested" pool setup, running 10 images in parallel using process_image:



import numpy as np
import ctypes
import array
import multiprocessing
import multiprocessing.pool
import random
from contextlib import contextmanager, closing


def init_shared(ncell):
'''Create shared value array for processing.'''
shared_array_base = multiprocessing.Array(ctypes.c_float,ncell,lock=False)
return(shared_array_base)

def tonumpyarray(shared_array):
'''Create numpy array from shared memory.'''
nparray= np.frombuffer(shared_array,dtype=ctypes.c_float)
assert nparray.base is shared_array
return nparray

def init_parameters(**kwargs):
'''Initialize parameters for processing in workers.'''

params = dict()

for key, value in kwargs.items():
params[key] = value
return params


def init_worker(shared_array_,parameters_):
'''Initialize worker for processing.

Args:
shared_array_: Object returned by init_shared
parameters_: Dictionary returned by init_parameters
'''
global arr
global parr
global dim
global xfact

arr = tonumpyarray(shared_array_)
arr.shape = parameters_['dimensions']
xfact = parameters_['x']
parr = tonumpyarray(parameters_['shared_parr'])

def worker_fun(ix):
'''Function to be run inside each worker'''


parr[ix] = ix * xfact

arr[ix,...] = arr[ix,...] * parr[ix]

def process_image(x):

i = Image(100,10,x)


class NoDaemonProcess(multiprocessing.Process):
# make 'daemon' attribute always return False
def _get_daemon(self):
return False
def _set_daemon(self, value):
pass
daemon = property(_get_daemon, _set_daemon)

# We sub-class multiprocessing.pool.Pool instead of multiprocessing.Pool
# because the latter is only a wrapper function, not a proper class.
class Pool(multiprocessing.pool.Pool):
Process = NoDaemonProcess



##----------------------------------------------------------------------


class Image:

def __init__(self,nrows,ncols,x):

shared_array = init_shared(nrows*ncols)
shared_parr = init_shared(nrows)

params = init_parameters(shared_parr=shared_parr,dimensions=(nrows,ncols),x=x)

arr = tonumpyarray(shared_array)
parr = tonumpyarray(params['shared_parr'])

arr.shape = (nrows,ncols)


arr[...] = np.random.randint(1,100,size=(100,10),dtype='int16')


with closing(Pool(processes=2,initializer = init_worker, initargs = (shared_array,params))) as pool:

res = pool.map(worker_fun,range(arr.shape[0]))

pool.close()
pool.join()

# check PARR output
print(parr)



def main():

with closing(Pool(processes=4)) as pool:

res = pool.map(process_image,range(10))

if __name__ == '__main__':
multiprocessing.freeze_support()
main()



Output:



[ 0. 1. 2. 3. 4. 5. 6. 7. 8. 9. 10. 11. 12. 13. 14. 15. 16.
17.
18. 19. 20. 21. 22. 23. 24. 25. 26. 27. 28. 29. 30. 31. 32. 33. 34. 35.
36. 37. 38. 39. 40. 41. 42. 43. 44. 45. 46. 47. 48. 49. 50. 51. 52. 53.
54. 55. 56. 57. 58. 59. 60. 61. 62. 63. 64. 65. 66. 67. 68. 69. 70. 71.
72. 73. 74. 75. 76. 77. 78. 79. 80. 81. 82. 83. 84. 85. 86. 87. 88. 89.
90. 91. 92. 93. 94. 95. 96. 97. 98. 99.]



[ 0. 3. 6. 9. 12. 15. 18. 21. 24. 27. 30. 33. 36. 39.
42. 45. 48. 51. 54. 57. 60. 63. 66. 69. 72. 75. 78. 81.
84. 87. 90. 93. 96. 99. 102. 105. 108. 111. 114. 117. 120. 123.
126. 129. 132. 135. 138. 141. 144. 147. 150. 153. 156. 159. 162. 165.
168. 171. 174. 177. 180. 183. 186. 189. 192. 195. 198. 201. 204. 207.
210. 213. 216. 219. 222. 225. 228. 231. 234. 237. 240. 243. 246. 249.
252. 255. 258. 261. 264. 267. 270. 273. 276. 279. 282. 285. 288. 291.
294. 297.]



...




The questions:



I know that by default you can't create these nested pools, because there can be issues with zombie processes.



But given that the approach seems to be working and also the embarrassingly parallel nature of my problem, I wondering:




  • Is parallelization like this a valid approach?

  • Is there maybe a different, better approach to this issue?

  • Are there other caveats to this approach which I haven't seen?


Thanks for your input.










share|improve this question


























    up vote
    0
    down vote

    favorite












    I'm processing image files, which are represented as 2d numpy arrays in python, and I need to iterate over the row of each array to apply a function.



    This is a follow-up question to a previous question of mine, which has more details about my processing chain.



    But here's the skinny:



    The image files are independently stored on disk. I read each file into memory and the use a multiprocessing pool to apply the function over each row.



    Now, I wanted to additionally speed up processing multiple files, by also using a pool for the images themselves. So the end result would be a pool of workers each processing an image, and each worker again starting a pool of workers processing the individual image.



    I quickly ran into issues because daemonic processes are not allowed to have children, but I did find a nice workaround on StackOverflow.



    I've adapted the code from my previous question to use this "nested" pool setup, running 10 images in parallel using process_image:



    import numpy as np
    import ctypes
    import array
    import multiprocessing
    import multiprocessing.pool
    import random
    from contextlib import contextmanager, closing


    def init_shared(ncell):
    '''Create shared value array for processing.'''
    shared_array_base = multiprocessing.Array(ctypes.c_float,ncell,lock=False)
    return(shared_array_base)

    def tonumpyarray(shared_array):
    '''Create numpy array from shared memory.'''
    nparray= np.frombuffer(shared_array,dtype=ctypes.c_float)
    assert nparray.base is shared_array
    return nparray

    def init_parameters(**kwargs):
    '''Initialize parameters for processing in workers.'''

    params = dict()

    for key, value in kwargs.items():
    params[key] = value
    return params


    def init_worker(shared_array_,parameters_):
    '''Initialize worker for processing.

    Args:
    shared_array_: Object returned by init_shared
    parameters_: Dictionary returned by init_parameters
    '''
    global arr
    global parr
    global dim
    global xfact

    arr = tonumpyarray(shared_array_)
    arr.shape = parameters_['dimensions']
    xfact = parameters_['x']
    parr = tonumpyarray(parameters_['shared_parr'])

    def worker_fun(ix):
    '''Function to be run inside each worker'''


    parr[ix] = ix * xfact

    arr[ix,...] = arr[ix,...] * parr[ix]

    def process_image(x):

    i = Image(100,10,x)


    class NoDaemonProcess(multiprocessing.Process):
    # make 'daemon' attribute always return False
    def _get_daemon(self):
    return False
    def _set_daemon(self, value):
    pass
    daemon = property(_get_daemon, _set_daemon)

    # We sub-class multiprocessing.pool.Pool instead of multiprocessing.Pool
    # because the latter is only a wrapper function, not a proper class.
    class Pool(multiprocessing.pool.Pool):
    Process = NoDaemonProcess



    ##----------------------------------------------------------------------


    class Image:

    def __init__(self,nrows,ncols,x):

    shared_array = init_shared(nrows*ncols)
    shared_parr = init_shared(nrows)

    params = init_parameters(shared_parr=shared_parr,dimensions=(nrows,ncols),x=x)

    arr = tonumpyarray(shared_array)
    parr = tonumpyarray(params['shared_parr'])

    arr.shape = (nrows,ncols)


    arr[...] = np.random.randint(1,100,size=(100,10),dtype='int16')


    with closing(Pool(processes=2,initializer = init_worker, initargs = (shared_array,params))) as pool:

    res = pool.map(worker_fun,range(arr.shape[0]))

    pool.close()
    pool.join()

    # check PARR output
    print(parr)



    def main():

    with closing(Pool(processes=4)) as pool:

    res = pool.map(process_image,range(10))

    if __name__ == '__main__':
    multiprocessing.freeze_support()
    main()



    Output:



    [ 0. 1. 2. 3. 4. 5. 6. 7. 8. 9. 10. 11. 12. 13. 14. 15. 16.
    17.
    18. 19. 20. 21. 22. 23. 24. 25. 26. 27. 28. 29. 30. 31. 32. 33. 34. 35.
    36. 37. 38. 39. 40. 41. 42. 43. 44. 45. 46. 47. 48. 49. 50. 51. 52. 53.
    54. 55. 56. 57. 58. 59. 60. 61. 62. 63. 64. 65. 66. 67. 68. 69. 70. 71.
    72. 73. 74. 75. 76. 77. 78. 79. 80. 81. 82. 83. 84. 85. 86. 87. 88. 89.
    90. 91. 92. 93. 94. 95. 96. 97. 98. 99.]



    [ 0. 3. 6. 9. 12. 15. 18. 21. 24. 27. 30. 33. 36. 39.
    42. 45. 48. 51. 54. 57. 60. 63. 66. 69. 72. 75. 78. 81.
    84. 87. 90. 93. 96. 99. 102. 105. 108. 111. 114. 117. 120. 123.
    126. 129. 132. 135. 138. 141. 144. 147. 150. 153. 156. 159. 162. 165.
    168. 171. 174. 177. 180. 183. 186. 189. 192. 195. 198. 201. 204. 207.
    210. 213. 216. 219. 222. 225. 228. 231. 234. 237. 240. 243. 246. 249.
    252. 255. 258. 261. 264. 267. 270. 273. 276. 279. 282. 285. 288. 291.
    294. 297.]



    ...




    The questions:



    I know that by default you can't create these nested pools, because there can be issues with zombie processes.



    But given that the approach seems to be working and also the embarrassingly parallel nature of my problem, I wondering:




    • Is parallelization like this a valid approach?

    • Is there maybe a different, better approach to this issue?

    • Are there other caveats to this approach which I haven't seen?


    Thanks for your input.










    share|improve this question
























      up vote
      0
      down vote

      favorite









      up vote
      0
      down vote

      favorite











      I'm processing image files, which are represented as 2d numpy arrays in python, and I need to iterate over the row of each array to apply a function.



      This is a follow-up question to a previous question of mine, which has more details about my processing chain.



      But here's the skinny:



      The image files are independently stored on disk. I read each file into memory and the use a multiprocessing pool to apply the function over each row.



      Now, I wanted to additionally speed up processing multiple files, by also using a pool for the images themselves. So the end result would be a pool of workers each processing an image, and each worker again starting a pool of workers processing the individual image.



      I quickly ran into issues because daemonic processes are not allowed to have children, but I did find a nice workaround on StackOverflow.



      I've adapted the code from my previous question to use this "nested" pool setup, running 10 images in parallel using process_image:



      import numpy as np
      import ctypes
      import array
      import multiprocessing
      import multiprocessing.pool
      import random
      from contextlib import contextmanager, closing


      def init_shared(ncell):
      '''Create shared value array for processing.'''
      shared_array_base = multiprocessing.Array(ctypes.c_float,ncell,lock=False)
      return(shared_array_base)

      def tonumpyarray(shared_array):
      '''Create numpy array from shared memory.'''
      nparray= np.frombuffer(shared_array,dtype=ctypes.c_float)
      assert nparray.base is shared_array
      return nparray

      def init_parameters(**kwargs):
      '''Initialize parameters for processing in workers.'''

      params = dict()

      for key, value in kwargs.items():
      params[key] = value
      return params


      def init_worker(shared_array_,parameters_):
      '''Initialize worker for processing.

      Args:
      shared_array_: Object returned by init_shared
      parameters_: Dictionary returned by init_parameters
      '''
      global arr
      global parr
      global dim
      global xfact

      arr = tonumpyarray(shared_array_)
      arr.shape = parameters_['dimensions']
      xfact = parameters_['x']
      parr = tonumpyarray(parameters_['shared_parr'])

      def worker_fun(ix):
      '''Function to be run inside each worker'''


      parr[ix] = ix * xfact

      arr[ix,...] = arr[ix,...] * parr[ix]

      def process_image(x):

      i = Image(100,10,x)


      class NoDaemonProcess(multiprocessing.Process):
      # make 'daemon' attribute always return False
      def _get_daemon(self):
      return False
      def _set_daemon(self, value):
      pass
      daemon = property(_get_daemon, _set_daemon)

      # We sub-class multiprocessing.pool.Pool instead of multiprocessing.Pool
      # because the latter is only a wrapper function, not a proper class.
      class Pool(multiprocessing.pool.Pool):
      Process = NoDaemonProcess



      ##----------------------------------------------------------------------


      class Image:

      def __init__(self,nrows,ncols,x):

      shared_array = init_shared(nrows*ncols)
      shared_parr = init_shared(nrows)

      params = init_parameters(shared_parr=shared_parr,dimensions=(nrows,ncols),x=x)

      arr = tonumpyarray(shared_array)
      parr = tonumpyarray(params['shared_parr'])

      arr.shape = (nrows,ncols)


      arr[...] = np.random.randint(1,100,size=(100,10),dtype='int16')


      with closing(Pool(processes=2,initializer = init_worker, initargs = (shared_array,params))) as pool:

      res = pool.map(worker_fun,range(arr.shape[0]))

      pool.close()
      pool.join()

      # check PARR output
      print(parr)



      def main():

      with closing(Pool(processes=4)) as pool:

      res = pool.map(process_image,range(10))

      if __name__ == '__main__':
      multiprocessing.freeze_support()
      main()



      Output:



      [ 0. 1. 2. 3. 4. 5. 6. 7. 8. 9. 10. 11. 12. 13. 14. 15. 16.
      17.
      18. 19. 20. 21. 22. 23. 24. 25. 26. 27. 28. 29. 30. 31. 32. 33. 34. 35.
      36. 37. 38. 39. 40. 41. 42. 43. 44. 45. 46. 47. 48. 49. 50. 51. 52. 53.
      54. 55. 56. 57. 58. 59. 60. 61. 62. 63. 64. 65. 66. 67. 68. 69. 70. 71.
      72. 73. 74. 75. 76. 77. 78. 79. 80. 81. 82. 83. 84. 85. 86. 87. 88. 89.
      90. 91. 92. 93. 94. 95. 96. 97. 98. 99.]



      [ 0. 3. 6. 9. 12. 15. 18. 21. 24. 27. 30. 33. 36. 39.
      42. 45. 48. 51. 54. 57. 60. 63. 66. 69. 72. 75. 78. 81.
      84. 87. 90. 93. 96. 99. 102. 105. 108. 111. 114. 117. 120. 123.
      126. 129. 132. 135. 138. 141. 144. 147. 150. 153. 156. 159. 162. 165.
      168. 171. 174. 177. 180. 183. 186. 189. 192. 195. 198. 201. 204. 207.
      210. 213. 216. 219. 222. 225. 228. 231. 234. 237. 240. 243. 246. 249.
      252. 255. 258. 261. 264. 267. 270. 273. 276. 279. 282. 285. 288. 291.
      294. 297.]



      ...




      The questions:



      I know that by default you can't create these nested pools, because there can be issues with zombie processes.



      But given that the approach seems to be working and also the embarrassingly parallel nature of my problem, I wondering:




      • Is parallelization like this a valid approach?

      • Is there maybe a different, better approach to this issue?

      • Are there other caveats to this approach which I haven't seen?


      Thanks for your input.










      share|improve this question













      I'm processing image files, which are represented as 2d numpy arrays in python, and I need to iterate over the row of each array to apply a function.



      This is a follow-up question to a previous question of mine, which has more details about my processing chain.



      But here's the skinny:



      The image files are independently stored on disk. I read each file into memory and the use a multiprocessing pool to apply the function over each row.



      Now, I wanted to additionally speed up processing multiple files, by also using a pool for the images themselves. So the end result would be a pool of workers each processing an image, and each worker again starting a pool of workers processing the individual image.



      I quickly ran into issues because daemonic processes are not allowed to have children, but I did find a nice workaround on StackOverflow.



      I've adapted the code from my previous question to use this "nested" pool setup, running 10 images in parallel using process_image:



      import numpy as np
      import ctypes
      import array
      import multiprocessing
      import multiprocessing.pool
      import random
      from contextlib import contextmanager, closing


      def init_shared(ncell):
      '''Create shared value array for processing.'''
      shared_array_base = multiprocessing.Array(ctypes.c_float,ncell,lock=False)
      return(shared_array_base)

      def tonumpyarray(shared_array):
      '''Create numpy array from shared memory.'''
      nparray= np.frombuffer(shared_array,dtype=ctypes.c_float)
      assert nparray.base is shared_array
      return nparray

      def init_parameters(**kwargs):
      '''Initialize parameters for processing in workers.'''

      params = dict()

      for key, value in kwargs.items():
      params[key] = value
      return params


      def init_worker(shared_array_,parameters_):
      '''Initialize worker for processing.

      Args:
      shared_array_: Object returned by init_shared
      parameters_: Dictionary returned by init_parameters
      '''
      global arr
      global parr
      global dim
      global xfact

      arr = tonumpyarray(shared_array_)
      arr.shape = parameters_['dimensions']
      xfact = parameters_['x']
      parr = tonumpyarray(parameters_['shared_parr'])

      def worker_fun(ix):
      '''Function to be run inside each worker'''


      parr[ix] = ix * xfact

      arr[ix,...] = arr[ix,...] * parr[ix]

      def process_image(x):

      i = Image(100,10,x)


      class NoDaemonProcess(multiprocessing.Process):
      # make 'daemon' attribute always return False
      def _get_daemon(self):
      return False
      def _set_daemon(self, value):
      pass
      daemon = property(_get_daemon, _set_daemon)

      # We sub-class multiprocessing.pool.Pool instead of multiprocessing.Pool
      # because the latter is only a wrapper function, not a proper class.
      class Pool(multiprocessing.pool.Pool):
      Process = NoDaemonProcess



      ##----------------------------------------------------------------------


      class Image:

      def __init__(self,nrows,ncols,x):

      shared_array = init_shared(nrows*ncols)
      shared_parr = init_shared(nrows)

      params = init_parameters(shared_parr=shared_parr,dimensions=(nrows,ncols),x=x)

      arr = tonumpyarray(shared_array)
      parr = tonumpyarray(params['shared_parr'])

      arr.shape = (nrows,ncols)


      arr[...] = np.random.randint(1,100,size=(100,10),dtype='int16')


      with closing(Pool(processes=2,initializer = init_worker, initargs = (shared_array,params))) as pool:

      res = pool.map(worker_fun,range(arr.shape[0]))

      pool.close()
      pool.join()

      # check PARR output
      print(parr)



      def main():

      with closing(Pool(processes=4)) as pool:

      res = pool.map(process_image,range(10))

      if __name__ == '__main__':
      multiprocessing.freeze_support()
      main()



      Output:



      [ 0. 1. 2. 3. 4. 5. 6. 7. 8. 9. 10. 11. 12. 13. 14. 15. 16.
      17.
      18. 19. 20. 21. 22. 23. 24. 25. 26. 27. 28. 29. 30. 31. 32. 33. 34. 35.
      36. 37. 38. 39. 40. 41. 42. 43. 44. 45. 46. 47. 48. 49. 50. 51. 52. 53.
      54. 55. 56. 57. 58. 59. 60. 61. 62. 63. 64. 65. 66. 67. 68. 69. 70. 71.
      72. 73. 74. 75. 76. 77. 78. 79. 80. 81. 82. 83. 84. 85. 86. 87. 88. 89.
      90. 91. 92. 93. 94. 95. 96. 97. 98. 99.]



      [ 0. 3. 6. 9. 12. 15. 18. 21. 24. 27. 30. 33. 36. 39.
      42. 45. 48. 51. 54. 57. 60. 63. 66. 69. 72. 75. 78. 81.
      84. 87. 90. 93. 96. 99. 102. 105. 108. 111. 114. 117. 120. 123.
      126. 129. 132. 135. 138. 141. 144. 147. 150. 153. 156. 159. 162. 165.
      168. 171. 174. 177. 180. 183. 186. 189. 192. 195. 198. 201. 204. 207.
      210. 213. 216. 219. 222. 225. 228. 231. 234. 237. 240. 243. 246. 249.
      252. 255. 258. 261. 264. 267. 270. 273. 276. 279. 282. 285. 288. 291.
      294. 297.]



      ...




      The questions:



      I know that by default you can't create these nested pools, because there can be issues with zombie processes.



      But given that the approach seems to be working and also the embarrassingly parallel nature of my problem, I wondering:




      • Is parallelization like this a valid approach?

      • Is there maybe a different, better approach to this issue?

      • Are there other caveats to this approach which I haven't seen?


      Thanks for your input.







      python array concurrency multiprocessing






      share|improve this question













      share|improve this question











      share|improve this question




      share|improve this question










      asked Nov 13 at 13:47









      Val

      1085




      1085



























          active

          oldest

          votes











          Your Answer





          StackExchange.ifUsing("editor", function () {
          return StackExchange.using("mathjaxEditing", function () {
          StackExchange.MarkdownEditor.creationCallbacks.add(function (editor, postfix) {
          StackExchange.mathjaxEditing.prepareWmdForMathJax(editor, postfix, [["\$", "\$"]]);
          });
          });
          }, "mathjax-editing");

          StackExchange.ifUsing("editor", function () {
          StackExchange.using("externalEditor", function () {
          StackExchange.using("snippets", function () {
          StackExchange.snippets.init();
          });
          });
          }, "code-snippets");

          StackExchange.ready(function() {
          var channelOptions = {
          tags: "".split(" "),
          id: "196"
          };
          initTagRenderer("".split(" "), "".split(" "), channelOptions);

          StackExchange.using("externalEditor", function() {
          // Have to fire editor after snippets, if snippets enabled
          if (StackExchange.settings.snippets.snippetsEnabled) {
          StackExchange.using("snippets", function() {
          createEditor();
          });
          }
          else {
          createEditor();
          }
          });

          function createEditor() {
          StackExchange.prepareEditor({
          heartbeatType: 'answer',
          convertImagesToLinks: false,
          noModals: true,
          showLowRepImageUploadWarning: true,
          reputationToPostImages: null,
          bindNavPrevention: true,
          postfix: "",
          imageUploader: {
          brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
          contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
          allowUrls: true
          },
          onDemand: true,
          discardSelector: ".discard-answer"
          ,immediatelyShowMarkdownHelp:true
          });


          }
          });














           

          draft saved


          draft discarded


















          StackExchange.ready(
          function () {
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fcodereview.stackexchange.com%2fquestions%2f207556%2fusing-nested-multiprocessing-pools-in-python%23new-answer', 'question_page');
          }
          );

          Post as a guest















          Required, but never shown






























          active

          oldest

          votes













          active

          oldest

          votes









          active

          oldest

          votes






          active

          oldest

          votes
















           

          draft saved


          draft discarded



















































           


          draft saved


          draft discarded














          StackExchange.ready(
          function () {
          StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fcodereview.stackexchange.com%2fquestions%2f207556%2fusing-nested-multiprocessing-pools-in-python%23new-answer', 'question_page');
          }
          );

          Post as a guest















          Required, but never shown





















































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown

































          Required, but never shown














          Required, but never shown












          Required, but never shown







          Required, but never shown







          Popular posts from this blog

          Сан-Квентин

          8-я гвардейская общевойсковая армия

          Алькесар