0

When i try to send the bulk_data to the local elasticsearch, my data isn't loaded because of the SerializationError.

I already tried to fill the empty cells in the csv file, but that wasn't the solution.

from elasticsearch import Elasticsearch

bulk_data = []
header = []
count = 0
for row in csv_file_object:
    if count > 0 : 
        data_dict = {}
        for i in range(len(row)):
            row = row.rstrip() 
            data_dict[header[i]] = row[i]
        op_dict = {
            "index": {
                "_index": INDEX_NAME, 
                "_type": TYPE_NAME, 
            }
        }
        bulk_data.append(op_dict)
        bulk_data.append(data_dict)
    else:
        header = row
    count = count+1

# create ES client, create index
es = Elasticsearch(hosts = [ES_HOST])
if es.indices.exists(INDEX_NAME):
    print("deleting '%s' index..." % (INDEX_NAME))
    res = es.indices.delete(index = INDEX_NAME)
    
res = es.bulk(index = INDEX_NAME, body = bulk_data, refresh = True)

See image for the SerializationError and bulk_data values:

enter image description here

Please note: the \n is added by the serialization process itself.

5
  • 1
    no, "\n" is added because you don't strip row. Should be row = row.rstrip() Commented Apr 24, 2018 at 12:48
  • elastic transform your row string in byte? It this behaviour correct? Or you would index only an array of numbers? Commented Apr 24, 2018 at 12:50
  • @Lupanoide, i want to add the 4 columns from the csv. But currently i get only the index till 120? See my edit where i loop over the row values to add to data_dict Commented Apr 24, 2018 at 12:55
  • 1
    you have 5 columns on your csv, the desired output should be {"105": [0, 10262233 , 20 , 10262233, 1] , "110": ...} right? Commented Apr 24, 2018 at 13:02
  • @Lupanoide Thank you for your help! Yes the desired output should be {"105": [0, 10262233 , 20 , 10262233, 1] , "110": ...} The current output is something like : {44: 50 95:51 97:49 99:51 100: } Commented Apr 24, 2018 at 13:11

1 Answer 1

1

I try to repond to you but I can't understand one thing. How you retrieve your field name from data? In your code I see that you retrieve it from a list called header that is empty? I can't understand how you take this value.. Check my answer i don't know if i understand well

from elasticsearch import Elasticsearch
from elasticsearch import helpers


index_name = "your_index_name"
doc_type = "your_doc_type"
esConnector = Elasticsearch(["http://192.168.1.1:9200/"])
# change your ip here
count = 0

def generate_data(csv_file_object)
    with open(csv_file_object, "r") as f:
        for line in f:
           line = line.split(",").rstrip()
           data_dict = {header[count]: line} 
           obj={
            '_op_type': 'index',
            '_index': index_name,
            '_type': doc_type,
            '_id': count+1,
            '_source': data_dict
                }
            count +=1
            yield obj


for success, info in helpers.parallel_bulk(client=esConnector, actions=generate_data(csv_file_object), thread_count=4):
    if not success: 
        print 'Doc failed', info
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.