0

I am trying to import a large amount of data using Elasticsearch parallel_bulk. This is my Index structure:

{
    "_index" : "myindex",
    "_type" : domain,
    "_id" : md5(email),
    "_score" : 1.0,
    "_source" : {
      "purchase_date" : purchase_date,
      "amount" : amount,
    }
}

And this is my python code:

def insert(input_file):
    paramL = []

    with open(input_file) as f:
        for line in f:
            line = line.rstrip()

            fields = line.split(',')
            purchase_date = fields[0]
            amount = fields[1]
            email = fields[2]               

            id_email = getMD5(email)

            doc = {
                "email": email,
                "purchase_date": purchase_date,
                "amount": amount _date
            }

            ogg = {
                '_op_type': 'index',
                '_index': index_param,
                '_type': doctype_param,
                '_id': id_email,
                '_source': doc
            }

            paramL.append(ogg)    

            if len(paramL) > 500000:
                for success, info in helpers.parallel_bulk(client=es, actions=paramL, thread_count=4):
                    if not success:
                        print "Insert failed: ", info

                # empty paramL if size > 5.000.000
                del paramL[:]

The file contains 42.644.394 rows and I thinked to insert data each time the list "paramL" is about 5.000.000 elements. So, when I run the script, it inserts about 436.226 values until it crashes with the following error:

Traceback (most recent call last): File "test-2-0.py", line 133, in main() File "test-2-0.py", line 131, in main insert(args.file) File "test-2-0.py", line 82, in insert for success, info in helpers.parallel_bulk(client=es, actions=paramL, thread_count=4): File "/usr/local/lib/python2.7/dist-packages/elasticsearch/helpers/init.py", line 306, in parallel_bulk _chunk_actions(actions, chunk_size, max_chunk_bytes, client.transport.serializer) File "/usr/lib/python2.7/multiprocessing/pool.py", line 668, in next raise value elasticsearch.exceptions.ConnectionTimeout: ConnectionTimeout caused by - ReadTimeoutError(HTTPConnectionPool(host=u'127.0.0.1', port=9200): Read timed out. (read timeout=10))

I also tried to increase timeout passing it in Elasticsearch constructor

es = Elasticsearch(['127.0.0.1'], request_timeout=30)

but the result is the same.

0

1 Answer 1

1

Sincerly I never do a bulk import with so many docs to indicize. I don't know why this error appears. In your case I suggest to not create a list -paramL - but to manage your data with a generator function - as described as best pratice for large bulk ingest in the elastic forum by an elastic developer here: https://discuss.elastic.co/t/helpers-parallel-bulk-in-python-not-working/39498/3 . Something like this:

def insert(input_file):

    with open(input_file) as f:
        for line in f:
            line = line.rstrip()

            fields = line.split(',')
            purchase_date = fields[0]
            amount = fields[1]
            email = fields[2]               

            id_email = getMD5(email)

            doc = {
                "email": email,
                "purchase_attack": purchase_date,
                "amount _relevation": amount _date
            }

            yield {
                '_op_type': 'index',
                '_index': index_param,
                '_type': doctype_param,
                '_id': id_email,
                '_source': doc
            }



for success, info in helpers.parallel_bulk(client=es, actions=insert(input_file), thread_count=4):
    if not success:
        print "Insert failed: ", info

You can increase the space dedicated to elastic in java virtual machine editing this file /etc/elasticsearch/jvm.options To allocate 2 GB of RAM, you should change - if your machine has 4 GB, you should keep almost 1 GB to the system, so you could allocate max 3 gb:

# Xms represents the initial size of total heap space
# Xmx represents the maximum size of total heap space

 -Xms2g
 -Xmx2g

Then you have to restart the service

sudo service elasticsearch restart

And try again. Good luck

Sign up to request clarification or add additional context in comments.

7 Comments

Hi @luponaoide, I customized your suggestion (in my previous question). I had to put "parallel_bulk" function in that way because otherwise the script crashed when paramL exceeded about 7.000.000 of collected values. In fact, originally I used a logic similar to yours, but as explained it crashes.
hi @bit Have you try using a generator, as in my answer, without using paramL?
Ciao @bit - I'm italian too - di Roma, so I understand well your log - connessione interrotta dal corrispondente , in the english version is connection reset by peer. This is a very annoying exception - not an error - because you can most of the times just ignore this exception. It can be related to a lot of situations, so should be better ask for help in the official elastic forum, that is monitored by the elastic developers. However, it seems that the system is really under pressure. If you can allocate more ram to elasticseach - in case I will show you how to do, I will try in this way
and I'm from Rome too...ti devo un caffè allora! XD I would try to increase the performance. this is the current configuration: - Intel Xeon 3199MHz - RAM 4GB - swap 4GB
@bit I've upgraded my answer with the instructions for increase RAM for ES. Per il caffé volentieri, eddaje! XD la cosa migliore sarebbe consigliarmi alla tua società qualora gli servisse un altro sviluppatore python - elastic. È davvero un casino cambiar lavoro qui!
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.