16

Q: Is there is any way to merge two dataframes or copy a column of a dataframe to another in PySpark?

For example, I have two Dataframes:

DF1              
C1                    C2                                                        
23397414             20875.7353   
5213970              20497.5582   
41323308             20935.7956   
123276113            18884.0477   
76456078             18389.9269 

the seconde dataframe

DF2
C3                       C4
2008-02-04               262.00                 
2008-02-05               257.25                 
2008-02-06               262.75                 
2008-02-07               237.00                 
2008-02-08               231.00 

Then i want to add C3 of DF2 to DF1 like this:

New DF              
    C1                    C2          C3                                              
    23397414             20875.7353   2008-02-04
    5213970              20497.5582   2008-02-05
    41323308             20935.7956   2008-02-06
    123276113            18884.0477   2008-02-07
    76456078             18389.9269   2008-02-08

I hope this example was clear.

0

10 Answers 10

16

rownum + window function i.e solution 1 or zipWithIndex.map i.e solution 2 should help in this case.

Solution 1 : You can use window functions to get this kind of

Then I would suggest you to add rownumber as additional column name to Dataframe say df1.

  DF1              
    C1                    C2                 columnindex                                             
    23397414             20875.7353            1
    5213970              20497.5582            2
    41323308             20935.7956            3
    123276113            18884.0477            4
    76456078             18389.9269            5

the second dataframe

DF2
C3                       C4             columnindex
2008-02-04               262.00            1        
2008-02-05               257.25            2      
2008-02-06               262.75            3      
2008-02-07               237.00            4          
2008-02-08               231.00            5

Now .. do inner join of df1 and df2 that's all... you will get below ouput

something like this

from pyspark.sql.window import Window
from pyspark.sql.functions import rowNumber

w = Window().orderBy()

df1 = ....  // as showed above df1

df2 = ....  // as shown above df2


df11 =  df1.withColumn("columnindex", rowNumber().over(w))
  df22 =  df2.withColumn("columnindex", rowNumber().over(w))

newDF = df11.join(df22, df11.columnindex == df22.columnindex, 'inner').drop(df22.columnindex)
newDF.show()



New DF              
    C1                    C2          C3                                              
    23397414             20875.7353   2008-02-04
    5213970              20497.5582   2008-02-05
    41323308             20935.7956   2008-02-06
    123276113            18884.0477   2008-02-07
    76456078             18389.9269   2008-02-08

Solution 2 : Another good way(probably this is best :)) in scala, which you can translate to pyspark :

/**
* Add Column Index to dataframe 
*/
def addColumnIndex(df: DataFrame) = sqlContext.createDataFrame(
  // Add Column index
  df.rdd.zipWithIndex.map{case (row, columnindex) => Row.fromSeq(row.toSeq :+ columnindex)},
  // Create schema
  StructType(df.schema.fields :+ StructField("columnindex", LongType, false))
)

// Add index now...
val df1WithIndex = addColumnIndex(df1)
val df2WithIndex = addColumnIndex(df2)

 // Now time to join ...
val newone = df1WithIndex
  .join(df2WithIndex , Seq("columnindex"))
  .drop("columnindex")
Sign up to request clarification or add additional context in comments.

3 Comments

use the below. rowNumber is no longer being used from pyspark.sql.window import Window from pyspark.sql.functions import row_number
also row_number doesn't work with a blank orderBy()
try to follow solution 2 in python. or below pyspark solution which was translated. second approach in scala should work
10

I thought I would share the python (pyspark) translation for answer #2 above from @Ram Ghadiyaram:

from pyspark.sql.functions import col
def addColumnIndex(df): 
  # Create new column names
  oldColumns = df.schema.names
  newColumns = oldColumns + ["columnindex"]

  # Add Column index
  df_indexed = df.rdd.zipWithIndex().map(lambda (row, columnindex): \
                                         row + (columnindex,)).toDF()

  #Rename all the columns
  new_df = reduce(lambda data, idx: data.withColumnRenamed(oldColumns[idx], 
                  newColumns[idx]), xrange(len(oldColumns)), df_indexed)   
  return new_df

# Add index now...
df1WithIndex = addColumnIndex(df1)
df2WithIndex = addColumnIndex(df2)

#Now time to join ...
newone = df1WithIndex.join(df2WithIndex, col("columnindex"),
                           'inner').drop("columnindex")

Comments

9

for python3 version,

from pyspark.sql.types import StructType, StructField, LongType

def with_column_index(sdf): 
    new_schema = StructType(sdf.schema.fields + [StructField("ColumnIndex", LongType(), False),])
    return sdf.rdd.zipWithIndex().map(lambda row: row[0] + (row[1],)).toDF(schema=new_schema)

df1_ci = with_column_index(df1)
df2_ci = with_column_index(df2)
join_on_index = df1_ci.join(df2_ci, df1_ci.ColumnIndex == df2_ci.ColumnIndex, 'inner').drop("ColumnIndex")

1 Comment

this worked for me on Spark 3.1, py 3, scala 2...thanks!
4

I referred to his(@Jed) answer

from pyspark.sql.functions import col
def addColumnIndex(df): 
    # Get old columns names and add a column "columnindex"
    oldColumns = df.columns
    newColumns = oldColumns + ["columnindex"]

    # Add Column index
    df_indexed = df.rdd.zipWithIndex().map(lambda (row, columnindex): \
                                         row + (columnindex,)).toDF()
    #Rename all the columns
    oldColumns = df_indexed.columns  
    new_df = reduce(lambda data, idx:data.withColumnRenamed(oldColumns[idx], 
                  newColumns[idx]), xrange(len(oldColumns)), df_indexed)   
    return new_df

# Add index now...
df1WithIndex = addColumnIndex(df1)
df2WithIndex = addColumnIndex(df2)

#Now time to join ...
newone = df1WithIndex.join(df2WithIndex, col("columnindex"),
                           'inner').drop("columnindex")

1 Comment

What is the difference from Jed's answer? If there is a difference, it should be explained, if not, this should not be posted as an answer..
4

This answer solved it for me:

import pyspark.sql.functions as sparkf

# This will return a new DF with all the columns + id
res = df.withColumn('id', sparkf.monotonically_increasing_id())

Credit to Arkadi T

3 Comments

This isn't going to work for joining two dataframes. The monotonically_increasing_id function doesn't return consecutive numbers. No guarantee the two dataframes will assign the same integer to rows in each df
I don't agree that the index could be different in the two dataframes. Please have a look at my code below
@justincress that is true, to guarantee the same id one should add .coalesce(1) before using the monotonically_increasing_id
1

Here is an simple example that can help you even if you have already solve the issue.

  //create First Dataframe
  val df1 = spark.sparkContext.parallelize(Seq(1,2,1)).toDF("lavel1")

  //create second Dataframe
  val df2 = spark.sparkContext.parallelize(Seq((1.0, 12.1), (12.1, 1.3), (1.1, 0.3))). toDF("f1", "f2")

  //Combine both dataframe
  val combinedRow = df1.rdd.zip(df2.rdd). map({
    //convert both dataframe to Seq and join them and return as a row
    case (df1Data, df2Data) => Row.fromSeq(df1Data.toSeq ++ df2Data.toSeq)
  })
//  create new Schema from both the dataframe's schema
  val combinedschema =  StructType(df1.schema.fields ++ df2.schema.fields)

//  Create a new dataframe from new row and new schema
  val finalDF = spark.sqlContext.createDataFrame(combinedRow, combinedschema)

  finalDF.show

1 Comment

Note : Caused by: org.apache.spark.SparkException: Can only zip RDDs with same number of elements in each partition
1

To merge columns from two different dataframe you have first to create a column index and then join the two dataframes. Indeed, two dataframes are similar to two SQL tables. To make a connection you have to join them.

If you don't care about the final order of the rows you can generate the index column with monotonically_increasing_id().

Using the following code you can check that monotonically_increasing_id generates the same index column in both dataframes (at least up to a billion of rows), so you won't have any error in the merged dataframe.

from pyspark.sql import SparkSession
import pyspark.sql.functions as F

sample_size = 1E9

sdf1 = spark.range(1, sample_size).select(F.col("id").alias("id1"))
sdf2 = spark.range(1, sample_size).select(F.col("id").alias("id2"))

sdf1 = sdf1.withColumn("idx", sf.monotonically_increasing_id())
sdf2 = sdf2.withColumn("idx", sf.monotonically_increasing_id())

sdf3 = sdf1.join(sdf2, 'idx', 'inner')
sdf3 = sdf3.withColumn("diff", F.col("id1")-F.col("id2")).select("diff")
sdf3.filter(F.col("diff") != 0 ).show()

Comments

0

Expanding on Jed's answer, in response to Ajinkya's comment:

To get the same old column names, you need to replace "old_cols" with a column list of the newly named indexed columns. See my modified version of the function below

def add_column_index(df):
    new_cols = df.schema.names + ['ix']
    ix_df = df.rdd.zipWithIndex().map(lambda (row, ix): row + (ix,)).toDF()
    tmp_cols = ix_df.schema.names
    return reduce(lambda data, idx: data.withColumnRenamed(tmp_cols[idx], new_cols[idx]), xrange(len(tmp_cols)), ix_df)

Comments

0

Not the better way performance wise.

df3=df1.crossJoin(df2).show(3)

1 Comment

This will create all combinations of rows, not what OP wanted
-1

You can use a combination of monotonically_increasing_id (guaranteed to always be increasing) and row_number (guaranteed to always give the same sequence). You cannot use row_number alone because it needs to be ordered by something. So here we order by monotonically_increasing_id. I am using Spark 2.3.1 and Python 2.7.13.

from pandas import DataFrame
from pyspark.sql.functions import (
    monotonically_increasing_id,
    row_number)
from pyspark.sql import Window

DF1 = spark.createDataFrame(DataFrame({
    'C1': [23397414, 5213970, 41323308, 123276113, 76456078],
    'C2': [20875.7353, 20497.5582, 20935.7956, 18884.0477, 18389.9269]}))

DF2 = spark.createDataFrame(DataFrame({
'C3':['2008-02-04', '2008-02-05', '2008-02-06', '2008-02-07', '2008-02-08']}))

DF1_idx = (
    DF1
    .withColumn('id', monotonically_increasing_id())
    .withColumn('columnindex', row_number().over(Window.orderBy('id')))
    .select('columnindex', 'C1', 'C2'))

DF2_idx = (
    DF2
    .withColumn('id', monotonically_increasing_id())
    .withColumn('columnindex', row_number().over(Window.orderBy('id')))
    .select('columnindex', 'C3'))

DF_complete = (
    DF1_idx
    .join(
        other=DF2_idx,
        on=['columnindex'],
        how='inner')
    .select('C1', 'C2', 'C3'))

DF_complete.show()

+---------+----------+----------+
|       C1|        C2|        C3|
+---------+----------+----------+
| 23397414|20875.7353|2008-02-04|
|  5213970|20497.5582|2008-02-05|
| 41323308|20935.7956|2008-02-06|
|123276113|18884.0477|2008-02-07|
| 76456078|18389.9269|2008-02-08|
+---------+----------+----------+

2 Comments

Would one of the downvoters like to give some constructive criticism here? I do not see why this is a bad answer.
It wasn't me, but unfortunately, this method does not guarantee that the row sequence will be the same in the two DataFrames. I tried it with a 10-row DataFrame and the results were scrambled. Shame, because I can't use rdd methods (the rdd API is not whitelisted on the cluster I am using).

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.