1

I was able to test the implementation without multiprocessing using the code below.

import unittest
from unittest.mock import patch

def side_effect_cube(x):
    return x**3

@patch("test.data.geocode.test_geo_thread.cube", side_effect=side_effect_cube)
def test_sum(mock_cube):
    assert find_cube(7) == [1, 8, 27, 64, 125, 216]
    assert mock_cube.called

def find_cube(num):
    result = []
    for i in range(1,num):
        result.append(cube(i))
    return result

def cube(x):
    return x**3

However when I added multi-processing to the implementation , it started failing.

import unittest
from unittest.mock import patch
import multiprocessing as mp

def side_effect_cube(x):
    return x**3

@patch("test.data.geocode.test_geo_thread.cube", side_effect=side_effect_cube)
def test_sum(mock_cube):
    assert find_cube(7) == [1, 8, 27, 64, 125, 216]
    assert mock_cube.called

def find_cube(num):
    pool = mp.Pool(processes=4)
    result = [pool.apply(cube, args=(x,)) for x in range(1, num)]
    return result

def cube(x):
    return x**3

Below is the error that I see

cls = <class 'multiprocessing.reduction.ForkingPickler'>, obj = (0, 0, <MagicMock name='cube' id='61482936'>, (1,), {}), protocol = None

    @classmethod
    def dumps(cls, obj, protocol=None):
        buf = io.BytesIO()
>       cls(buf, protocol).dump(obj) E       _pickle.PicklingError: Can't pickle <class 'unittest.mock.MagicMock'>: it's not the same object as unittest.mock.MagicMock

    ..\..\appdata\local\programs\python\python37\lib\multiprocessing\reduction.py:51: PicklingError

1 Answer 1

3

This is a unresolved problem.

"Because the class for any individual Mock / MagicMock isn't available at the top level of the mock module I don't think this can be fixed (fundamental pickle limitation)." #139

Here some workarounds: If you don't need the magic methods, like assert and return_value you can create a custom class, based on the issue referenced above, #139:

class PickableMock(Mock):
def __reduce__(self):
    return Mock, ()

As stated here, you can use a fork of the multiprocess lib, I recommended to check the thread to see the problems you may face with that.


The solution I used for my use case was to mock the multiprocess.Pool(), I used as a context manager, and thanks to this thread I was able to mock and assert the method that was called with the correct arguments. Your code will be like:

import multiprocessing
import unittest
from unittest.mock import patch, MagicMock


def cube(x):
    return x**3


def find_cube(num):
    with multiprocessing.Pool() as executor:
        return [executor.apply(cube, args=(x,)) for x in range(1, num)]


class MyTestCase(unittest.TestCase):
    @patch("multiprocessing.Pool")
    def test_cube(self, pool):
        pool.return_value.__enter__.apply = MagicMock()
        num = 7
        find_cube(num)
        for i in range(1, num):
            self.assertEqual(i, 
pool.return_value.__enter__.mock_calls[i].kwargs['args'][0])
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.