2

I'm new to pyspark. I want to add a new column with multiple values and the partition with those values.

import math

coun=df.count()

if(coun<= 20000):
    chunksize=2
    rowsperchunk = math.ceil(coun/2)
else:
    chunksize= math.ceil(coun/20000)
    rowsperchunk = 20000

for i in chunksize:
    df.limit(num_rows_per_chunk).withColumn('chunk',F.lit(i))

in for loop above it will only insert 1 value till limit

example: i have 100k rows in my data frame so chunk size will be 5. and rows per chunk is 20 000 so i need add new column first 20 000 rows need to be inserted with value 1 and the next 20 000 rows needs to be inserted with value 2. till the end of chunksize. then i want to partition based on the new column we created

1
  • Have you found the answer you were looking for? Commented Nov 15, 2020 at 9:37

1 Answer 1

1

So you want to repartition the data so that it is partitionned in partitions of the same size, and while preserving the order.

In it not that easy in spark. What I would do is start by counting the size of each partition. Then, for each partition I would compute the number of records that are in the dataframe in the previous partitions. With that and the rank of the record in the partition (partition_rank), a division by the size of the desired partitions will give me the new allocation. Note that I introduce an index column to compute the rank and preserve the order. Here is the code:

partition_size = 20000

from pyspark.sql import functions as F
part_counts = df.withColumn("p", F.spark_partition_id()).groupBy("p").count().collect()
part_counts.sort()
part_counts = [(x[0], x[1]) for x in part_counts]

cum_part_counts = []
sum=0
for index, count in part_counts:
    cum_part_counts.append((index, sum))
    sum+=count
cum_part_counts_df = spark.createDataFrame(cum_part_counts, ['partition_index', 'count'])

repartitioned_df = df\
  .withColumn("partition_index", F.spark_partition_id())\
  .withColumn("index", F.monotonically_increasing_id())\
  .withColumn("partition_rank", F.rank().over(
           Window.partitionBy("partition_index").orderBy("index")))\
  .join(cum_part_counts_df, ['partition_index'])\
  .withColumn("new_partition",
      F.floor((F.col("count") + F.col("partition_rank") - 1)/partition_size))\
  .orderBy("index")\
  .write.partitionBy("new_partition").parquet("...")
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.