1

I have 3 dataframes on spark : dataframe1 , dataframe2 and dataframe3 .

I want to join dataframe1 with an other dataframe based on a condition.

I use the following code :

Dataset <Row> df= dataframe1.filter(when(col("diffDate").lt(3888),dataframe1.join(dataframe2,
            dataframe2.col("id_device").equalTo(dataframe1.col("id_device")).
            and(dataframe2.col("id_vehicule").equalTo(dataframe1.col("id_vehicule"))).
            and(dataframe2.col("tracking_time").lt(dataframe1.col("tracking_time")))).orderBy(dataframe2.col("tracking_time").desc())).
                   otherwise(dataframe1.join(dataframe3,
                   dataframe3.col("id_device").equalTo(dataframe1.col("id_device")).
                           and(dataframe3.col("id_vehicule").equalTo(dataframe1.col("id_vehicule"))).
                           and(dataframe3.col("tracking_time").lt(dataframe1.col("tracking_time")))).orderBy(dataframe3.col("tracking_time").desc())));

But I get this exception

Exception in thread "main" java.lang.RuntimeException: Unsupported literal type class org.apache.spark.sql.Dataset

EDIT

Input dataframes :

dataframe1

+-----------+-------------+-------------+-------------+
| diffDate  |id_device    |id_vehicule  |tracking_time|
+-----------+-------------+-------------+-------------+
|222        |1            |5            |2020-05-30   |          
|4700       |8            |9            |2019-03-01   |
+-----------+-------------+-------------+-------------+

dataframe2

+-----------+-------------+-------------+-------------+
|id_device  |id_vehicule  |tracking_time|longitude    |
+-----------+-------------+-------------+-------------+
|1          |5            |2020-05-12   | 33.21111    |       
|8          |9            |2019-03-01   |20.2222      |
+-----------+-------------+-------------+-------------+

dataframe3

+-----------+-------------+-------------+-------------+
|id_device  |id_vehicule  |tracking_time|latitude     |
+-----------+-------------+-------------+-------------+
|1          |5            |2020-05-12   | 40.333      |       
|8          |9            |2019-02-28   |2.00000      |
+-----------+-------------+-------------+-------------+

when diffDate < 3888

+-----------+-------------+-------------+-------------+-----------+-------------+-------------+------------+
| diffDate  |id_device    |id_vehicule  |tracking_time|id_device  |id_vehicule  |tracking_time|longitude|
+-----------+-------------+-------------+-------------+ +-----------+-------------+-------------+-------------+
|222        |1            |5            |2020-05-30   | 1          |5            |2020-05-12   | 33.21111    |       
-----------+--------------+---------------+----------+----------+--------+-----------+--------------+-----------+         

when diffDate > 3888

 +-----------+-------------+-------------+-------------+-----------+-------------+-------------+------------+
| diffDate  |id_device    |id_vehicule  |tracking_time|id_device  |id_vehicule  |tracking_time|latitude|
+-----------+-------------+-------------+-------------+ +-----------+-------------+-------------+-------------+
|4700        |9            |5            |2019-03-01   | 8          |9            |2019-02-28   | 2.00000    |       
-----------+--------------+---------------+----------+----------+--------+-----------+--------------+-----------+         

I need your help

Thank you.

1
  • can you post a sample input and expected output? Commented Jul 8, 2020 at 6:08

1 Answer 1

1

I think you need to revisit your code.

You are trying to execute a join for each row of the dataframe1 (of course based on the condition), which is I think incorrect requirement or misunderstood requirement.

when(condition, then).otherwise() function executes for each row of the underlying dataframe and generally used to process the column based on condition. then and else/otherwise clause in the function only supports literals which are existing columns in the dataframe primitive/ complex types and literals. you can't put dataframe or any operation outputting the dataframe there

May be your requirement is to join the datafrmae1 with datafrmae2 for the rows where col("diffDate").lt(3888). TO achieve this you can do the following -

dataframe1.join(dataframe2,
                dataframe2.col("id_device").equalTo(dataframe1.col("id_device")).
                        and(dataframe2.col("id_vehicule").equalTo(dataframe1.col("id_vehicule"))).
                        and(dataframe2.col("tracking_time").lt(dataframe1.col("tracking_time"))).
                        and(dataframe1.col("diffDate").lt(3888))
                )
                        .orderBy(dataframe2.col("tracking_time").desc())

Edit-1


        dataframe1.as("a").join(dataframe2.as("b"),
                dataframe2.col("id_device").equalTo(dataframe1.col("id_device")).
                        and(dataframe2.col("id_vehicule").equalTo(dataframe1.col("id_vehicule"))).
                        and(dataframe2.col("tracking_time").lt(dataframe1.col("tracking_time"))).
                        and(dataframe1.col("diffDate").lt(3888))
        ).selectExpr("a.*", "b.longitude", "null as latitude")
                .unionByName(
                        dataframe1.as("a").join(dataframe3.as("c"),
                                dataframe3.col("id_device").equalTo(dataframe1.col("id_device")).
                                        and(dataframe3.col("id_vehicule").equalTo(dataframe1.col("id_vehicule"))).
                                        and(dataframe3.col("tracking_time").lt(dataframe1.col("tracking_time"))).
                                        and(dataframe1.col("diffDate").geq(3888))
                        ).selectExpr("a.*", "c.latitude", "null as longitude")
                               
                )
Sign up to request clarification or add additional context in comments.

1 Comment

thank you for your reply. note that I wanted a single dataframe which ensures the filter to use it after. there is no posibility to do that ?

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.