4

I have two dataframes, df1 and df2:

df1.show()
+---+--------+-----+----+--------+
|cA |   cB   |  cC | cD |   cE   |
+---+--------+-----+----+--------+
|  A|   abc  | 0.1 | 0.0|   0    |
|  B|   def  | 0.15| 0.5|   0    |
|  C|   ghi  | 0.2 | 0.2|   1    |
|  D|   jkl  | 1.1 | 0.1|   0    |
|  E|   mno  | 0.1 | 0.1|   0    |
+---+--------+-----+----+--------+


df2.show()
+---+--------+-----+----+--------+
|cA |   cB   |  cH | cI |   cJ   |
+---+--------+-----+----+--------+
|  A|   abc  | a   | b  |   ?    |
|  C|   ghi  | a   | c  |   ?    |
+---+--------+-----+----+--------+

I would like to update cE column in df1 and set it to 1, if the row is referenced in df2. Each record is identified by cA and cB columns.

Below is the desired output; Note that the cE value of the first record was updated to 1:

+---+--------+-----+----+--------+
|cA |   cB   |  cC | cD |   cE   |
+---+--------+-----+----+--------+
|  A|   abc  | 0.1 | 0.0|   1    |
|  B|   def  | 0.15| 0.5|   0    |
|  C|   ghi  | 0.2 | 0.2|   1    |
|  D|   jkl  | 1.1 | 0.1|   0    |
|  E|   mno  | 0.1 | 0.1|   0    |
+---+--------+-----+----+--------+
8
  • The algorithm looks pretty straightforward - do join, then modify value of cE column depending on the result of join for each row. Have you tried anything yourself or just want that someone give you a solution? Commented Jul 15, 2019 at 9:10
  • @VladislavVarslavans I know how to join the dataframes as a first step: df1.alias('a').join(df2, (df1.cA == df2.cA) & (df1.cB==df2.cB), how='left').select('a.*') but I am not sure how to make the last step Commented Jul 15, 2019 at 9:31
  • there is withColumn api in spark. It will create new column and you can use values from other columns to fill in values. After that you can do another select with columns you require Commented Jul 15, 2019 at 9:34
  • @VladislavVarslavans I don't want to create a new column but update an existing one. Commented Jul 15, 2019 at 9:35
  • You can't modify the column. But you can first create new column, and then discard old one. So using withColumn api you create new column based on value from old column cE and the result of join. After that you do select for all relevant columns including new one, but excluding old one. Commented Jul 15, 2019 at 9:44

4 Answers 4

5

When there is scenario of updating a column value based on another column, then the when clause comes handy. Please Refer the when and otherwise clause.

import pyspark.sql.functions as F
df3=df1.join(df2,(df1.cA==df2.cA)&(df1.cB==df2.cB),"full").withColumn('cE',F.when((df1.cA==df2.cA)&(df1.cB==df2.cB),1).otherwise(0)).select(df1.cA,df1.cB,df1.cC,df1.cD,'cE')
df3.show()
+---+---+----+---+---+
| cA| cB|  cC| cD| cE|
+---+---+----+---+---+
|  E|mno| 0.1|0.1|  0|
|  B|def|0.15|0.5|  0|
|  C|ghi| 0.2|0.2|  1|
|  A|abc| 0.1|0.0|  1|
|  D|jkl| 1.1|0.1|  0|
+---+---+----+---+---+
Sign up to request clarification or add additional context in comments.

Comments

3

Here is my answer.

It's scala code - sorry for that - i don't have python installed. Hopefully that helps.

import org.apache.spark.sql._
import org.apache.spark.sql.functions._

val ss = SparkSession.builder().master("local").getOrCreate()

import ss.implicits._

val seq1 = Seq(
  ("A", "abc", 0.1, 0.0, 0),
  ("B", "def", 0.15, 0.5, 0),
  ("C", "ghi", 0.2, 0.2, 1),
  ("D", "jkl", 1.1, 0.1, 0),
  ("E", "mno", 0.1, 0.1, 0)
)

val seq2 = Seq(
  ("A", "abc", "a", "b", "?"),
  ("C", "ghi", "a", "c", "?")
)


val df1 = ss.sparkContext.makeRDD(seq1).toDF("cA", "cB", "cC", "cD", "cE")
val df2 = ss.sparkContext.makeRDD(seq2).toDF("cA", "cB", "cH", "cI", "cJ")


val joined = df1.join(df2, (df1("cA") === df2("cA")).and(df1("cB") === df2("cB")), "left")

val res = joined.withColumn("newCe",
  when(df2("cA").isNull.and(joined("cE") === lit(0)), lit(0)).otherwise(lit(1)))


res.select(df1("cA"), df1("cB"), df1("cC"), df1("cD"), res("newCe"))
  .withColumnRenamed("newCe", "cE")
  .show

And the output for me is:

+---+---+----+---+---+
| cA| cB|  cC| cD| cE|
+---+---+----+---+---+
|  E|mno| 0.1|0.1|  0|
|  B|def|0.15|0.5|  0|
|  C|ghi| 0.2|0.2|  1|
|  A|abc| 0.1|0.0|  1|
|  D|jkl| 1.1|0.1|  0|
+---+---+----+---+---+

Comments

0

Using join you can do what you want :

df1 = pd.DataFrame({ 'cA' : ['A', 'B', 'C', 'D', 'E'], 'cB' : ['abc', 'def', 'ghi', 'jkl', 'mno'], 'cE' : [0,0,1, 0, 0]})
df2 = pd.DataFrame({ 'cA' : ['A', 'C'], 'cB' : ['abc', 'ghi'], 'cE' : ['?','?']})

# join
df = df1.join(df2.set_index(['cA', 'cB']),  lsuffix='_df1', rsuffix='_df2', on=['cA', 'cB'])

# nan values indicates rows that are not present in both dataframes
df.loc[~df['cE_df2'].isna(), 'cE_df2'] = 1
df.loc[df['cE_df2'].isna(), 'cE_df2'] = 0

df1['cE'] = df['cE_df2']

Output :

    cA  cB  cE
0   A   abc 1
1   B   def 0
2   C   ghi 1
3   D   jkl 0
4   E   mno 0

1 Comment

I am not using pandas dataframes.
0

try this

for i in df2.values:
    df1.loc[(df1.cA==i[0]) & (df1.cB == i[1]),['cE']] = 1

1 Comment

I am not using pandas dataframes.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.