3

Or just to make this simple t understand I have a data frame .

DataPartition   TimeStamp   OrganizationID  SourceID    AuditorID   AuditorEnumerationId    AuditorOpinionCode  AuditorOpinionId    IsPlayingAuditorRole    IsPlayingCSRAuditorRole IsPlayingTaxAdvisorRole FFAction|!| AuditorOpinionOnInternalControlCode AuditorOpinionOnGoingConcernCode    AuditorOpinionOnInternalControlsId  AuditorOpinionOnGoingConcernId  rank
Japan   2018-05-03T09:52:48+00:00   4295876589  194 null    null    null    null    null    null    null    O|!|    null    null    null    null    1
Japan   2018-05-03T09:52:48+00:00   4295876589  194 2719    3023331 AOP 3010542 true    false   true    O|!|    null    null    null    null    1
Japan   2018-05-03T09:52:48+00:00   4295876589  195 16157   1002485247  UWE 3010547 true    false   false   O|!|    null    null    null    null    1
Japan   2018-05-03T07:36:47+00:00   4295876589  196 3252    3024053 ONC 3020538 true    false   true    O|!|    null    null    null    null    1
Japan   2018-05-03T07:36:47+00:00   4295876589  195 5937    3026578 NOP 3010543 true    false   true    O|!|    null    null    null    null    1
Japan   2018-05-02T10:37:50+00:00   4295876589  156 null    null    null    null    null    null    null    O|!|    null    null    null    null    1
Japan   2018-05-02T10:37:50+00:00   4295876589  157 null    null    null    null    null    null    null    O|!|    null    null    null    null    1
Japan   2018-05-02T10:37:56+00:00   4295876589  193 null    null    null    null    null    null    null    O|!|    null    null    null    null    1
Japan   2018-05-03T08:10:19+00:00   4295876589  196 null    null    null    null    null    null    null    D|!|    null    null    null    null    1
Japan   2018-05-03T09:52:48+00:00   4295876589  195 null    null    null    null    null    null    null    O|!|    null    null    null    null    1

Now i need to selecet columns which has Rank =1 and AuditorID!=null but AuditorID =!=null will be applicable only for FFAction|!|="O".

In that case my output data frame should be like below

DataPartition   TimeStamp   OrganizationID  SourceID    AuditorID   AuditorEnumerationId    AuditorOpinionCode  AuditorOpinionId    IsPlayingAuditorRole    IsPlayingCSRAuditorRole IsPlayingTaxAdvisorRole FFAction|!| AuditorOpinionOnInternalControlCode AuditorOpinionOnGoingConcernCode    AuditorOpinionOnInternalControlsId  AuditorOpinionOnGoingConcernId  rank

Japan   2018-05-03T09:52:48+00:00   4295876589  194 2719    3023331 AOP 3010542 true    false   true    O|!|    null    null    null    null    1
Japan   2018-05-03T09:52:48+00:00   4295876589  195 16157   1002485247  UWE 3010547 true    false   false   O|!|    null    null    null    null    1
Japan   2018-05-03T07:36:47+00:00   4295876589  196 3252    3024053 ONC 3020538 true    false   true    O|!|    null    null    null    null    1
Japan   2018-05-03T07:36:47+00:00   4295876589  195 5937    3026578 NOP 3010543 true    false   true    O|!|    null    null    null    null    1
Japan   2018-05-02T10:37:56+00:00   4295876589  193 null    null    null    null    null    null    null    I|!|    null    null    null    null    1
Japan   2018-05-03T08:10:19+00:00   4295876589  196 null    null    null    null    null    null    null    D|!|    null    null    null    null    1

Here is the my code

import org.apache.spark.sql.expressions._
    val windowSpec = Window.partitionBy("OrganizationID", "SourceID", "AuditorID").orderBy(unix_timestamp($"TimeStamp", "yyyy-MM-dd'T'HH:mm:ss").cast("timestamp").desc)
    val latestForEachKey1 = finaldf.withColumn("rank", row_number().over(windowSpec))
    .filter($"rank" === 1 && $"AuditorID" =!= "null")

Scenario 2 ...

Here is my data frame

uniqueFundamentalSet    PeriodId    SourceId    StatementTypeCode   StatementCurrencyId UpdateReason_updateReasonId UpdateReasonComment UpdateReasonComment_languageId  UpdateReasonEnumerationId   FFAction|!| DataPartition   PartitionYear   TimeStamp
192730230775    297 182 INC 500186  6   UpdateReasonToDelete    505074  3019685 I|!|    Japan   2017    2018-05-10T09:57:29+00:00
192730230775    297 182 INC 500186  6   UpdateReasonToDelete    505074  3019685 I|!|    Japan   2017    2018-05-10T10:00:40+00:00
192730230775    297 182 INC 500186  null    null    null    null    O|!|    Japan   2017    2018-05-10T10:11:15+00:00
192730230775    310 182 INC 500186  null    null    null    null    O|!|    Japan   2018    2018-05-10T08:30:53+00:00

When i apply code suggested iin answer i get below output

val windowSpec = Window.partitionBy("uniqueFundamentalSet", "PeriodId", "SourceId", "StatementTypeCode", "StatementCurrencyId", "UpdateReason_updateReasonId").orderBy(unix_timestamp($"TimeStamp", "yyyy-MM-dd'T'HH:mm:ss").cast("timestamp").desc)
    val latestForEachKey1 = tempReorder.withColumn("rank", row_number().over(windowSpec))
      .filter($"rank" === 1 && (($"UpdateReason_updateReasonId" =!= "null" && $"FFAction|!|" === "O|!|") || $"FFAction|!|" =!= "O|!|")).drop("rank")

192730230775    297 182 INC 500186  6   UpdateReasonToDelete    505074  3019685 I|!|    Japan   2017    2018-05-10T10:00:40+00:00

But my expected output is this .

192730230775    297 182 INC 500186  null    null    null    null    O|!|    Japan   2017    2018-05-10T10:11:15+00:00
6
  • You want to check for only AuditorID? Commented May 10, 2018 at 9:35
  • @RameshMaharjan yes if DataPartition|OrganizationID|SourceID this columns are matching Commented May 10, 2018 at 9:35
  • then include that column in window function in orderby but in descending order Commented May 10, 2018 at 9:36
  • @RameshMaharjan let me try ..I mean i have to get latest in case i get Delete and Overwrite based on only two columns in that case AuditorId should not be added Commented May 10, 2018 at 9:39
  • 1
    Possible duplicate of Getting latest based on column condition in spark scala is not working Commented May 10, 2018 at 9:40

2 Answers 2

1

here's the working code for you

import org.apache.spark.sql.expressions._
val windowSpec = Window.partitionBy("OrganizationID", "SourceID", "AuditorID").orderBy(unix_timestamp($"TimeStamp", "yyyy-MM-dd'T'HH:mm:ss").cast("timestamp").desc)
val latestForEachKey1 = finaldf.withColumn("rank", row_number().over(windowSpec))
  .filter($"rank" === 1 && (($"AuditorID" =!= "null" && $"FFAction|!|" === "O|!|") || $"FFAction|!|" =!= "O|!|"))

which should give you

+-------------+-------------------------+--------------+--------+---------+--------------------+------------------+----------------+--------------------+-----------------------+-----------------------+-----------+-----------------------------------+--------------------------------+----------------------------------+------------------------------+----+
|DataPartition|TimeStamp                |OrganizationID|SourceID|AuditorID|AuditorEnumerationId|AuditorOpinionCode|AuditorOpinionId|IsPlayingAuditorRole|IsPlayingCSRAuditorRole|IsPlayingTaxAdvisorRole|FFAction|!||AuditorOpinionOnInternalControlCode|AuditorOpinionOnGoingConcernCode|AuditorOpinionOnInternalControlsId|AuditorOpinionOnGoingConcernId|rank|
+-------------+-------------------------+--------------+--------+---------+--------------------+------------------+----------------+--------------------+-----------------------+-----------------------+-----------+-----------------------------------+--------------------------------+----------------------------------+------------------------------+----+
|Japan        |2018-05-03T09:52:48+00:00|4295876589    |194     |2719     |3023331             |AOP               |3010542         |true                |false                  |true                   |O|!|       |null                               |null                            |null                              |null                          |1   |
|Japan        |2018-05-03T09:52:48+00:00|4295876589    |195     |16157    |1002485247          |UWE               |3010547         |true                |false                  |false                  |O|!|       |null                               |null                            |null                              |null                          |1   |
|Japan        |2018-05-03T07:36:47+00:00|4295876589    |196     |3252     |3024053             |ONC               |3020538         |true                |false                  |true                   |O|!|       |null                               |null                            |null                              |null                          |1   |
|Japan        |2018-05-03T07:36:47+00:00|4295876589    |195     |5937     |3026578             |NOP               |3010543         |true                |false                  |true                   |O|!|       |null                               |null                            |null                              |null                          |1   |
|Japan        |2018-05-03T08:10:19+00:00|4295876589    |196     |null     |null                |null              |null            |null                |null                   |null                   |D|!|       |null                               |null                            |null                              |null                          |1   |
+-------------+-------------------------+--------------+--------+---------+--------------------+------------------+----------------+--------------------+-----------------------+-----------------------+-----------+-----------------------------------+--------------------------------+----------------------------------+------------------------------+----+

Note : record with sourceID 193 has o|!| and null so it shouldn't be in the output

Sign up to request clarification or add additional context in comments.

6 Comments

Hello sir ...Can you please look one scenario ...It is failing here ...Its because i of the Action type ..
I will explain you the issue sir ..I am not creating another question because people will mark it as duplicate ...The issue is Even though 182 UpdateReason_updateReasonId has null value but it has greatest timestamp ..So i need actually the one with latest timestamp ..If timestamp are same then i need to pick one which has null as mentioned value and |O|..I hope i make seance ..That us what i tried to explain in the first scenario ..
@RameshMaharjan Yes that's the case ..So first preference will be for latest timestamp ..In case if Timestamp is same then we need to consider The one which does not have UpdateReason_updateReasonId as null and O..
@SUDARSHAN the reason is that you have included UpdateReason_updateReasonId in the partitionBy which made the first three lines into two groups. so the record with null and latest date is never in the same group. all you have to do is remove UpdateReason_updateReasonId from the partitionBy and you should be fine
@RameshMaharjan In that case if UpdateReason_updateReasonId is different for and Other than all columns are same then other record will be discarded ..My primary key is combination of columns till UpdateReason_updateReasonId from OrgnizationId..
|
0

You can use rownum udf to drop duplicates and check is rownum =1 and authorid is not null

1 Comment

No it wont work all the time .Suppose if i get only one row then rank will be 1 and Auditor id will be null ..In that case i want to retain that row ..

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.