1

I have two Spark DataFrames:

cities DataFrame with the following column:

city
-----
London
Austin

bigCities DataFrame with the following column:

name
------
London
Cairo

I need to transform DataFrame cities and add an additional Boolean column there: bigCity Value of this column must be calculated based on the following condition "cities.city IN bigCities.name"

I can do this in the following way(with a static bigCities collection):

cities.createOrReplaceTempView("cities")

var resultDf = spark.sql("SELECT city, CASE WHEN city IN ['London', 'Cairo'] THEN 'Y' ELSE 'N' END AS bigCity FROM cities")

but I don't know how to replace the static bigCities collection ['London', 'Cairo'] with bigCities DataFrame in the query. I want to use bigCities as the reference data in the query.

Please advise how to achieve this.

2
  • 2
    You can use left join between these two datasets on cities.city and bigCities.name column. If value coming from right table is null in the result, you can flag it with false with withColumn and when function on Spark. Commented Jan 21, 2019 at 12:00
  • Other solution also possible, you can take unique values of bigCities.name column into a list. Afterwards, you can filter or flag your city Dataset with isin function of Column. Commented Jan 21, 2019 at 12:04

2 Answers 2

3
val df = cities.join(bigCities, $"name".equalTo($"city"), "leftouter").
                withColumn("bigCity", when($"name".isNull, "N").otherwise("Y")).
                drop("name")
Sign up to request clarification or add additional context in comments.

Comments

2

You can use collect_list() on the the bigCities table. Check this out

scala> val df_city = Seq(("London"),("Austin")).toDF("city")
df_city: org.apache.spark.sql.DataFrame = [city: string]

scala> val df_bigCities = Seq(("London"),("Cairo")).toDF("name")
df_bigCities: org.apache.spark.sql.DataFrame = [name: string]

scala> df_city.createOrReplaceTempView("cities")

scala> df_bigCities.createOrReplaceTempView("bigCities")

scala> spark.sql(" select city, case when array_contains((select collect_list(name) from bigcities),city) then 'Y' else 'N' end as bigCity from cities").show(false)
+------+-------+
|city  |bigCity|
+------+-------+
|London|Y      |
|Austin|N      |
+------+-------+


scala>

If the dataset is big, you can use collect_set which will be more efficient.

scala> spark.sql(" select city, case when array_contains((select collect_set(name) from bigcities),city) then 'Y' else 'N' end as bigCity from cities").show(false)
+------+-------+
|city  |bigCity|
+------+-------+
|London|Y      |
|Austin|N      |
+------+-------+


scala>

2 Comments

Thanks! Is it a problem for Spark and the proposed approach that bigcities DataFrame can be quite big? Let's say based on 9GB CSV file
the file can be big but you will be passing only one column to collect_list() function. Also I suppose there will be duplicates, so you can use collect_set() instead of collect_list which will be efficient.. let me update the answer!

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.