6

So I'm learning to take data from ElasticSearch throught Apache Spark. Let's say I've connected to ElasticSearch that has 'users' index.

sqlContext = SQLContext(sc)
usersES=sqlContext.read.format('org.elasticsearch.spark.sql').option('es.nodes','mynode').load('users/user')

explain(usersES) shows me this:

== Physical Plan ==

Scan ElasticsearchRelation(Map(es.nodes -> mynode, es.resource -> users/user),org.apache.spark.sql.SQLContext@6c78e806,None)[about#145,activities#146,bdate#147, uid#148]

When I use filter:

usersES.filter(usersES.uid==1566324).explain()

== Physical Plan == Filter (uid#203L = 1566324) +- Scan ElasticsearchRelation(Map(es.nodes -> mynode, es.resource -> users/user),org.apache.spark.sql.SQLContext@6c78e806,None)[about#145,activities#146,bdate#147,uid#148] PushedFilters: [EqualTo(uid,1566324)]

As you see, Spark elegantly pushes the filter to ElasticSearch, making the index search fast and comfortable.

But when I try joining usersES with another dataframe, I get the same issue all the time: Spark scans through the whole ElasticSearch index, not pushing any filters I give it. For example:

a = sc.parallelize([1566324,1566329]).map(Row('id')).toDF()
a.join(usersES, usersES.uid==a.id).explain()

shows:

SortMergeJoin [id#210L], [uid#203L] :- Sort [id#210L ASC], false, 0 : +- TungstenExchange hashpartitioning(id#210L,200), None : +- ConvertToUnsafe : +- Scan ExistingRDD[id#210L] +- Sort [uid#203L ASC], false, 0 +- TungstenExchange hashpartitioning(uid#203L,200), None +- ConvertToUnsafe +- Scan ElasticsearchRelation(Map(es.nodes -> mynode, es.resource -> users/user),org.apache.spark.sql.SQLContext@6c78e806,None)[about#145,activities#146,bdate#147,uid#148]

Please, tell me, is that possible to push filter inside Elasticsearch inside the join?

0

1 Answer 1

4

This is an expected behavior, yes elaticsearch-hadoop connector supports pushdown predicate but there is no push when you join.

This is because the join operation does not know anything about how the keys are partitioned in your dataframes.

By default, this operation will hash all the keys of both dataframes, sending all the elements with the same key hash across the network to the same machine, and then join together the elements with the same key on that machine.

And that's why you get that execution plan without the predicate being pushed down.

EDIT : It seems like the connector supports since the version 2.1 the IN clause. You ought using that if your DataFrame a isn't big.

Sign up to request clarification or add additional context in comments.

14 Comments

Thanks! Seems like this is the only way, although not fitting my needs (thousands of records in df to py inside the IN)
Well than the only way is to use spark to join instead of the In clause or you can even use a broadcast variable.
Can you please at least accept the answer to close the issue ?
I have a similar need. I think I can fit one of the dataframes into a broadcast var. If that is the case (a broadcast-hash-join) will Spark be using Elastic's indexes?
@Avision what do you mean by « spark using elastic’s indexes » ?
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.