1

I have a PySpark dataframe with ~70 columns and tens of millions of rows. Each dataframe has few columns that contain dates (as strings). There are 3 possible formats of dates - "yyyyMMdd", "yyyy-MM-dd" and "yyyy.MM.dd" - and each row could contain any of these formats. In other words, different columns and/or rows can contain dates in different formats (don't ask...)

For example:

column1     date1     column2     date2
------------------------------------------
something  20241224    abc      2024.12.24
different  2024-12-24  abc      20241224

Is there some efficient way how to convert these dates to pyspark.sql.Types.DateType?

I've tried:

from pyspark.sql.functions import to_date, coalesce, col, when, otherwise

formats = ("yyyyMMdd", "yyyy.MM.dd", "yyyy-MM-dd")
column = col("date1")
df = df.coalesce(*(to_date(column, fmt) for fmt in formats))

and also

df = df.withColumn("date1", 
                   when(column.contains("-"), to_date(column, "yyyy-MM-dd")).
                   when(column.contains("."), to_date(column, "yyyy.MM.dd")).
                   otherwise(to_date(column, "yyyyMMdd"))
                  )

Both of the approaches work, but are really slow. For a df with ~33M records it takes ~9000 seconds to finish.

Is there some smarter/better way how to do this? Unfortunately, the requirements are not under my control, so I have to deal with df with mixed formats like that so it's not an option to have it in the df in single format.

2 Answers 2

1

The most important thing is to use when. This saves a significant amount of computation time. Beyond that, it becomes harder to see substantive improvements in performance.

The to_date function is relatively slow, especially when format string is provided. It may be convenient for relative simplicity, but when performance is important, I would try looking for alternatives such as simple cast or make_date.


The following are the results of some performance tests. The "best" results are not necessarily best. That's because the difference becomes small and results are different every time this kind of test is run. It is not easy to test performance of a distributed computation. The results will also depend on your data. What can clearly be seen, is the conclusions from above: performance increases when using when and avoiding to_date.

Dataframe:

import random

# Generating rows of date strings in random format: "yyyyMMdd", "yyyy.MM.dd" or "yyyy-MM-dd".
data = [
    (random.choice([
        str(random.randint(2020, 2023)) + str(random.randint(1, 12)).zfill(2) + str(random.randint(1, 28)).zfill(2),
        str(random.randint(2020, 2023)) + '-' + str(random.randint(1, 12)).zfill(2) + '-' + str(random.randint(1, 28)).zfill(2),
        str(random.randint(2020, 2023)) + '.' + str(random.randint(1, 12)).zfill(2) + '.' + str(random.randint(1, 28)).zfill(2),
    ]),)
    for _ in range(100000)
]
df = spark.createDataFrame(data, ["date_str"])
df = df.persist()

Functions:

from pyspark.sql import functions as F

def parse_date_coalesce(col):
    formats = ("yyyyMMdd", "yyyy.MM.dd", "yyyy-MM-dd")
    return F.coalesce(*(F.to_date(col, f) for f in formats))

def parse_date_when(col):
    return F.when(F.col(col).contains("-"), F.to_date(col, "yyyy-MM-dd")) \
            .when(F.col(col).contains("."), F.to_date(col, "yyyy.MM.dd")) \
            .otherwise(F.to_date(col, "yyyyMMdd"))

def parse_date_translate_eq(col):
    return F.when(F.col(col)[5:1] == '-', F.col(col).cast('date')) \
            .when(F.col(col)[5:1] == '.', F.translate(col, ".", "-").cast('date')) \
            .otherwise(F.make_date(F.col(col)[1:4], F.col(col)[5:2], F.col(col)[7:2]))

def parse_date_translate_contains(col):
    return F.when(F.col(col).contains("-"), F.col(col).cast('date')) \
            .when(F.col(col).contains("."), F.translate(col, ".", "-").cast('date')) \
            .otherwise(F.make_date(F.col(col)[1:4], F.col(col)[5:2], F.col(col)[7:2]))

def parse_date_make_date_eq(col):
    return F.when(F.col(col)[5:1] == '-', F.col(col).cast('date')) \
            .when(F.col(col)[5:1] == '.', F.make_date(F.col(col)[1:4], F.col(col)[6:2], F.col(col)[9:2])) \
            .otherwise(F.make_date(F.col(col)[1:4], F.col(col)[5:2], F.col(col)[7:2]))

def parse_date_make_date_contains(col):
    return F.when(F.col(col).contains("-"), F.col(col).cast('date')) \
            .when(F.col(col).contains("."), F.make_date(F.col(col)[1:4], F.col(col)[6:2], F.col(col)[9:2])) \
            .otherwise(F.make_date(F.col(col)[1:4], F.col(col)[5:2], F.col(col)[7:2]))

Results:

%timeit -n 5 -r 5 df.withColumn("date_parsed", parse_date_coalesce('date_str')).write.format("noop").mode("overwrite").save()
# 1.15 s ± 141 ms per loop (mean ± std. dev. of 5 runs, 5 loops each)

%timeit -n 20 -r 20 df.withColumn("date_parsed", parse_date_when('date_str')).write.format("noop").mode("overwrite").save()
# 145 ms ± 25.7 ms per loop (mean ± std. dev. of 20 runs, 20 loops each)

%timeit -n 20 -r 20 df.withColumn("date_parsed", parse_date_translate_eq('date_str')).write.format("noop").mode("overwrite").save()
# 112 ms ± 19.6 ms per loop (mean ± std. dev. of 20 runs, 20 loops each)

%timeit -n 20 -r 20 df.withColumn("date_parsed", parse_date_translate_contains('date_str')).write.format("noop").mode("overwrite").save()
# 110 ms ± 18 ms per loop (mean ± std. dev. of 20 runs, 20 loops each)

%timeit -n 20 -r 20 df.withColumn("date_parsed", parse_date_make_date_eq('date_str')).write.format("noop").mode("overwrite").save()
# 106 ms ± 15.3 ms per loop (mean ± std. dev. of 20 runs, 20 loops each)

%timeit -n 20 -r 20 df.withColumn("date_parsed", parse_date_make_date_contains('date_str')).write.format("noop").mode("overwrite").save()
# 101 ms ± 15.2 ms per loop (mean ± std. dev. of 20 runs, 20 loops each)

IMHO, you can shave off some time, but no more than 20-30% from what you currently have. Talking only about date-parsing part. Your production script probably has more parts, not mentioning cluster/ Spark application initiation, reading and writing the data, which all contribute to the processing time. So the small boost from date-processing may well become invisible.

Sign up to request clarification or add additional context in comments.

1 Comment

thanks. i was hoping there might be something like df.apply' or 'df.map' or some similar mechanism where i can do all the 'if/elif/else' in single 'visit' of the row. But appraently this is not the case.
0

In your case, you can remove the dot and hyphen(for example using translate function) before running to_date function:

from pyspark.sql import functions as F

fmt_date = lambda c: F.to_date(F.translate(c,'-.',''), 'yyyyMMdd')

df_new = df.withColumns(dict((c, fmt_date(c)) for c in ['date1', 'date2']))

df_new.show()
#+---------+----------+-------+----------+
#|  column1|     date1|column2|     date2|
#+---------+----------+-------+----------+
#|something|2024-12-24|    abc|2024-12-24|
#|different|2024-12-24|    abc|2024-12-24|
#+---------+----------+-------+----------+

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.