17

Given two Spark Datasets, A and B I can do a join on single column as follows:

a.joinWith(b, $"a.col" === $"b.col", "left")

My question is whether you can do a join using multiple columns. Essentially the equivalent of the following DataFrames api code:

a.join(b, a("col") === b("col") && a("col2") === b("col2"), "left")

3 Answers 3

18

You can do it exactly the same way as with Dataframe:

val xs = Seq(("a", "foo", 2.0), ("x", "bar", -1.0)).toDS
val ys = Seq(("a", "foo", 2.0), ("y", "bar", 1.0)).toDS

xs.joinWith(ys, xs("_1") === ys("_1") && xs("_2") === ys("_2"), "left").show
// +------------+-----------+
// |          _1|         _2|
// +------------+-----------+
// | [a,foo,2.0]|[a,foo,2.0]|
// |[x,bar,-1.0]|       null|
// +------------+-----------+

In Spark < 2.0.0 you can use something like this:

xs.as("xs").joinWith(
  ys.as("ys"), ($"xs._1" === $"ys._1") && ($"xs._2" === $"ys._2"), "left")
Sign up to request clarification or add additional context in comments.

Comments

9

There's another way of joining by chaining where one after another. You first specify a join (and optionally its type) followed by where operator(s), i.e.

scala> case class A(id: Long, name: String)
defined class A

scala> case class B(id: Long, name: String)
defined class B

scala> val as = Seq(A(0, "zero"), A(1, "one")).toDS
as: org.apache.spark.sql.Dataset[A] = [id: bigint, name: string]

scala> val bs = Seq(B(0, "zero"), B(1, "jeden")).toDS
bs: org.apache.spark.sql.Dataset[B] = [id: bigint, name: string]

scala> as.join(bs).where(as("id") === bs("id")).show
+---+----+---+-----+
| id|name| id| name|
+---+----+---+-----+
|  0|zero|  0| zero|
|  1| one|  1|jeden|
+---+----+---+-----+


scala> as.join(bs).where(as("id") === bs("id")).where(as("name") === bs("name")).show
+---+----+---+----+
| id|name| id|name|
+---+----+---+----+
|  0|zero|  0|zero|
+---+----+---+----+

The reason for such a goodie is that the Spark optimizer will join (no pun intended) consecutive wheres into one with join. Use explain operator to see the underlying logical and physical plans.

scala> as.join(bs).where(as("id") === bs("id")).where(as("name") === bs("name")).explain(extended = true)
== Parsed Logical Plan ==
Filter (name#31 = name#36)
+- Filter (id#30L = id#35L)
   +- Join Inner
      :- LocalRelation [id#30L, name#31]
      +- LocalRelation [id#35L, name#36]

== Analyzed Logical Plan ==
id: bigint, name: string, id: bigint, name: string
Filter (name#31 = name#36)
+- Filter (id#30L = id#35L)
   +- Join Inner
      :- LocalRelation [id#30L, name#31]
      +- LocalRelation [id#35L, name#36]

== Optimized Logical Plan ==
Join Inner, ((name#31 = name#36) && (id#30L = id#35L))
:- Filter isnotnull(name#31)
:  +- LocalRelation [id#30L, name#31]
+- Filter isnotnull(name#36)
   +- LocalRelation [id#35L, name#36]

== Physical Plan ==
*BroadcastHashJoin [name#31, id#30L], [name#36, id#35L], Inner, BuildRight
:- *Filter isnotnull(name#31)
:  +- LocalTableScan [id#30L, name#31]
+- BroadcastExchange HashedRelationBroadcastMode(List(input[1, string, false], input[0, bigint, false]))
   +- *Filter isnotnull(name#36)
      +- LocalTableScan [id#35L, name#36]

Comments

3

In Java, the && operator does not work. The correct way to join based on multiple columns in Spark-Java is as below:

            Dataset<Row> datasetRf1 = joinedWithDays.join(
                    datasetFreq, 
                    datasetFreq.col("userId").equalTo(joinedWithDays.col("userId"))
                    .and(datasetFreq.col("artistId").equalTo(joinedWithDays.col("artistId"))),
                            "inner"
                    );

The and function works like the && operator.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.