0

I have a case much like this one:

Example DataFrame:

from pyspark.sql.types import *
schema = StructType([  # schema
    StructField("id", StringType(), True),
    StructField("email", ArrayType(StringType()), True)])
df = spark.createDataFrame([{"id": "id1"},
                            {"id": "id2", "email": None},
                            {"id": "id3","email": ["[email protected]"]},
                            {"id": "id4", "email": ["[email protected]", "[email protected]"]}],
                           schema=schema)
df.show(truncate=False)
+---+------------------------------------+
|id |email                               |
+---+------------------------------------+
|id1|null                                |
|id2|null                                |
|id3|[[email protected]]                  |
|id4|[[email protected], [email protected]]|
+---+------------------------------------+

I want to insert this data into Elasticsearch, so as far as I researched, I have to transform into the indexing format:

def parseTest(r):
    if r['email'] is None:
        return r['id'],{"id":r['id']}
    else:
        return r['id'],{"id":r['id'],"email":r['email']}
df2 = df.rdd.map(lambda row: parseTest(row))
df2.top(4)
[('id4', {'email': ['[email protected]', '[email protected]'], 'id': 'id4'}),
 ('id3', {'email': ['[email protected]'], 'id': 'id3'}),
 ('id2', {'id': 'id2'}),
 ('id1', {'id': 'id1'})]

Then I try to insert:

es_conf = {"es.nodes" : "node1.com,node2.com",
           "es.resource": "index/type"}
df2.saveAsNewAPIHadoopFile(
    path='-', 
    outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
    keyClass="org.apache.hadoop.io.NullWritable",
    valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", 
    conf=es_conf)

And I get this:

org.apache.spark.SparkException: Data of type java.util.ArrayList cannot be used

Spark v 2.1.0
ES v 2.4.4

Without the email field it works fine, I found some proposed solution using the es.output.json: true and json.dumps, but it appeared to be for version 5, so I tried in another cluster I have with ES v5

df3 = df2.map(json.dumps)
df3.top(4)
['["id4", {"email": ["[email protected]", "[email protected]"], "id": "id4"}]',
 '["id3", {"email": ["[email protected]"], "id": "id3"}]',
 '["id2", {"id": "id2"}]',
 '["id1", {"id": "id1"}]']
es_conf2 = {"es.nodes" : "anothernode1.com,anothernode2.com",
           "es.output.json": "true",
           "es.resource": "index/type"}
df3.saveAsNewAPIHadoopFile(
    path='-', 
    outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
    keyClass="org.apache.hadoop.io.NullWritable",
    valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", 
    conf=es_conf2)

Then I get:

RDD element of type java.lang.String cannot be used

Spark v 2.1.0
ES v 5.2.0

feelsbadman

1 Answer 1

1

I found another way to do the same job, by using the write method of a dataframe object.

So, following the first section:

from pyspark.sql.types import *
schema = StructType([  # schema
    StructField("id", StringType(), True),
    StructField("email", ArrayType(StringType()), True)])
df = spark.createDataFrame([{"id": "id1"},
                            {"id": "id2", "email": None},
                            {"id": "id3","email": ["[email protected]"]},
                            {"id": "id4", "email": ["[email protected]", "[email protected]"]}],
                           schema=schema)
df.show(truncate=False)
+---+------------------------------------+
|id |email                               |
+---+------------------------------------+
|id1|null                                |
|id2|null                                |
|id3|[[email protected]]                  |
|id4|[[email protected], [email protected]]|
+---+------------------------------------+

You just need to:

df.write\
    .format("org.elasticsearch.spark.sql")\
    .option("es.nodes","node1.com,node2.com")\
    .option("es.resource","index/type")\
    .option("es.mapping.id", "id")\
    .save()

No need to transform into a RDD or modify in any manner.

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.