0

I am testing a class that uses bulk to index some documents.

Here is my code:

import mock
import unittest
import json

from elasticsearch.helpers import bulk

from ingestion.ingestor import Ingestor

class TestIngestor(unittest.TestCase):

    def setUp(self):
        self.ingestor = Ingestor()

    @mock.patch("elasticsearch.helpers.bulk", mock.MagicMock(return_value=True))
    def test_ingestor(self):
        with open("tests/data/sample_payload.json", "r") as reader:
            sample_messages = json.loads(reader.read())["Messages"]

        actions = self.ingestor.ingest(sample_messages)

        self.assertEqual(len(actions), 10)

However, the mocking does not seem to work... when I run it, I get a long list of connection refused errors.

How do I fix it?

1 Answer 1

1

It turns out that my patch was wrong... Here is how I fixed it:

@mock.patch("elasticsearch.Elasticsearch.bulk", 
             mock.MagicMock(return_value={"items":[]}))
Sign up to request clarification or add additional context in comments.

1 Comment

Faced similar issues while mocking this lib. Quite hard to get the paths right because of how modules are imported (even more with _async ressources). Say I am using from elasticsearch.helpers import async_bulk what would the path be?

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.