4

I have a Spark Dataframe looking like this:

|  time  | col1 | col2 |
|----------------------|
| 123456 |   2  |  A   |
| 123457 |   4  |  B   |
| 123458 |   7  |  C   |
| 123459 |   5  |  D   |
| 123460 |   3  |  E   |
| 123461 |   1  |  F   |
| 123462 |   9  |  G   |
| 123463 |   8  |  H   |
| 123464 |   6  |  I   |

Now I need to sort the "col1" - Column, but the other columns have to remain in the same order: (Using pyspark)

|  time  | col1 | col2 | col1_sorted |
|-----------------------------------|
|  same  | same | same |   sorted   |
|-----------------------------------|
| 123456 |   2  |  A   |     1      |
| 123457 |   4  |  B   |     2      |
| 123458 |   7  |  C   |     3      |
| 123459 |   5  |  D   |     4      |
| 123460 |   3  |  E   |     5      |
| 123461 |   1  |  F   |     6      |
| 123462 |   9  |  G   |     7      |
| 123463 |   8  |  H   |     8      |
| 123464 |   6  |  I   |     9      |

Thanks in advance for any help!

2
  • actually this is already a partition Commented Sep 8, 2020 at 7:47
  • I use spark 2.3.1, is there a solution for spark 2.4.x? Commented Sep 9, 2020 at 7:39

2 Answers 2

2

For Spark 2.3.1, you can try pandas_udf, see below (assume the original dataframe is sorted by the time column)

from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import StructType

schema = StructType.fromJson(df.schema.jsonValue()).add('col1_sorted', 'integer')

@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def get_col1_sorted(pdf):
  return pdf.sort_values(['time']).assign(col1_sorted=sorted(pdf["col1"]))
  
df.groupby().apply(get_col1_sorted).show()
+------+----+----+-----------+
|  time|col1|col2|col1_sorted|
+------+----+----+-----------+
|123456|   2|   A|          1|
|123457|   4|   B|          2|
|123458|   7|   C|          3|
|123459|   5|   D|          4|
|123460|   3|   E|          5|
|123461|   1|   F|          6|
|123462|   9|   G|          7|
|123463|   8|   H|          8|
|123464|   6|   I|          9|
+------+----+----+-----------+
Sign up to request clarification or add additional context in comments.

2 Comments

I assume this function converts the spark dataframe into a pandas dataframe?
it does not convert the whole dataframe into pandas df, but do use pandas engine in a distributed way. see spark.apache.org/docs/2.3.1/…
0

My own solution is the following:

First make a copy of df with col1 selected and ordered by col1:

df_copy = df.select("col1").orderBy("col1")

Second indexing both dataframes: (same for df_copy, just with window orderBy("col1"))

w = Window.orderBy("time").rowsBetween(-sys.maxsize, 0)

df = df\
            .withColumn("helper", lit(1))\
            .withColumn("index", lit(0))\
            .withColumn("index", F.col("index")+F.sum(F.col("helper")).over(w))

Last step, rename the col1 to col1_sorted and joining the dataframes

df_copy = df_copy.withColumnRenamed("col1", "col1_sorted")
    
df = df.join(df_copy, df.index == df_copy.index, how="inner")

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.