0

I have a spark dataFrame as below:

INPUT

+----------+-------------+------------------+-----------+------------+-----------+--------------+------------------+--------------+------+---------+------+--------+----------+----------+
| accountId|accountNumber|acctNumberTypeCode|cisDivision|currencyCode|priceItemCd|priceItemParam|priceItemParamCode|processingDate|txnAmt|  txnDttm|txnVol|udfChar1|  udfChar2|  udfChar3|
+----------+-------------+------------------+-----------+------------+-----------+--------------+------------------+--------------+------+---------+------+--------+----------+----------+
|2032000000|   2032000000|          C1_F_ANO|         CA|         USD| PRICEITEM2|            UK|           Country|    2018-06-06|   100|28-MAY-18|   100|   TYPE1|PRICEITEM1|PRICEITEM2|
|2032000000|   2032000000|          C1_F_ANO|         CA|         USD| PRICEITEM2|            UK|           Country|    2018-06-06|   100|28-MAY-18|   100|   TYPE1|PRICEITEM1|PRICEITEM2|
|1322000000|   1322000000|          C1_F_ANO|         CA|         USD| PRICEITEM1|            US|           Country|    2018-06-06|   100|28-MAY-18|   100|   TYPE1|PRICEITEM1|PRICEITEM2|
|1322000000|   1322000000|          C1_F_ANO|         CA|         USD| PRICEITEM1|            US|           Country|    2018-06-06|   100|28-MAY-18|   100|   TYPE1|PRICEITEM1|PRICEITEM2|

Now I want to perform,

  1. Sum of "txnAmt" column for the records having same accountId and account numbers.
  2. Drop duplicate records.

Output

+----------+-------------+------------------+-----------+------------+-----------+--------------+------------------+--------------+------+---------+------+--------+----------+----------+
| accountId|accountNumber|acctNumberTypeCode|cisDivision|currencyCode|priceItemCd|priceItemParam|priceItemParamCode|processingDate|txnAmt|  txnDttm|txnVol|udfChar1|  udfChar2|  udfChar3|
+----------+-------------+------------------+-----------+------------+-----------+--------------+------------------+--------------+------+---------+------+--------+----------+----------+
|2032000000|   2032000000|          C1_F_ANO|         CA|         USD| PRICEITEM2|            UK|           Country|    2018-06-06|   200|28-MAY-18|   100|   TYPE1|PRICEITEM1|PRICEITEM2|
|1322000000|   1322000000|          C1_F_ANO|         CA|         USD| PRICEITEM1|            US|           Country|    2018-06-06|   200|28-MAY-18|   100|   TYPE1|PRICEITEM1|PRICEITEM2|

I am not sure how to perform step 1?

I have written code to perform step 2, drop the duplicates based on accountId and account numbers:

String[] colNames = {"accountId", "accountNumber"};
Dataset<RuleOutputParams> finalDs = rulesParamDS.dropDuplicates(colNames);

Can anyone help?

8
  • Can two rows have the same accountId and accountNumber but different values in other columns? Commented Jun 6, 2018 at 6:51
  • You're not dropping duplicates... You are aggregating them Commented Jun 6, 2018 at 6:55
  • @Shaido yes it is possible. Commented Jun 6, 2018 at 6:59
  • So when you do the aggregation (or dropping), if for example the txnDttm has different values while accountId and accountNumber are the same. How do you know which value of txnDttm to keep? Both? Commented Jun 6, 2018 at 7:03
  • @cricket_007 finalDS has only unique rows based on values of accountId and account number, I thought it has dropped remaining rows, can you please point something out which can lead me to complete the requirement. Commented Jun 6, 2018 at 7:03

1 Answer 1

1

Load data and make a SQL table for it

val df = spark.read.format("csv").option("header", true).load("data.csv")
df.createOrReplaceTempView("t")

Then, what you need are called Window Aggregation functions, plus a trick with row_number() to remove the duplicates

val df2 = spark.sql("""SELECT * FROM (
  SELECT *, 
    sum(txnAmt) OVER (PARTITION BY accountId, accountNumber) s, 
    row_number() OVER (PARTITION BY accountId, accountNumber ORDER BY processingDate) r FROM t) 
  WHERE r=1""")
  .drop("txnAmt", "r")
  .withColumnRenamed("s", "txnAmt")

And if you show that, you'll see

+----------+-------------+------------------+-----------+------------+-----------+--------------+------------------+--------------+---------+------+--------+----------+----------+------+
| accountId|accountNumber|acctNumberTypeCode|cisDivision|currencyCode|priceItemCd|priceItemParam|priceItemParamCode|processingDate|  txnDttm|txnVol|udfChar1|  udfChar2|  udfChar3|txnAmt|
+----------+-------------+------------------+-----------+------------+-----------+--------------+------------------+--------------+---------+------+--------+----------+----------+------+
|2032000000|   2032000000|          C1_F_ANO|         CA|         USD| PRICEITEM2|            UK|           Country|    2018-06-06|28-MAY-18|   100|   TYPE1|PRICEITEM1|PRICEITEM2| 200.0|
|1322000000|   1322000000|          C1_F_ANO|         CA|         USD| PRICEITEM1|            US|           Country|    2018-06-06|28-MAY-18|   100|   TYPE1|PRICEITEM1|PRICEITEM2| 200.0|
+----------+-------------+------------------+-----------+------------+-----------+--------------+------------------+--------------+---------+------+--------+----------+----------+------+

As a side note, one might try to add more columns to the following, but you would need to add them to the group by clause

spark.sql("SELECT accountId, accountNumber, SUM(txnAmt) txnAmt FROM t GROUP BY accountId, accountNumber").show
+----------+-------------+------+
| accountId|accountNumber|txnAmt|
+----------+-------------+------+
|2032000000|   2032000000| 200.0|
|1322000000|   1322000000| 200.0|
+----------+-------------+------+
Sign up to request clarification or add additional context in comments.

6 Comments

Thanks a lot for the explanation however when I have written val df2 = spark.sql("""SELECT * FROM ( SELECT *, sum(txnAmt) OVER (PARTITION BY accountId, accountNumber) s, row_number() OVER (PARTITION BY accountId, accountNumber ORDER BY processingDate) r FROM t) WHERE r=1""") .drop("txnAmt", "r") .withColumnRenamed("s", "txnAmt") query it is giving compiler error
Java doesn't use triple quotes like Scala does. Or val keyword
yes I tried with single quotes and compiler error was resolved. Thanks.
When I am writing the final DF in csv file instead of creating one csv file , it creates different csv files for different rows(records) , why is so?
Spark is distributed over many CPUs and computers. When you do write("file.csv"), it makes a directory. One file per Spark partition. forums.databricks.com/questions/2848/…
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.