5

I wanted to read multiple CSV files with different number of columns using PySpark.

Files=['Data/f1.csv','Data/f2.csv','Data/f3.csv','Data/f4.csv','Data/f5.csv']

f1 file has 50 columns, f2 has 10 more columns that constitutes total 60 columns and f3 has 30 more columns that is total 80 columns for f3 file and so on.

However,

df = spark.read.csv(Files,header=True)

gives only 50 columns. I am expecting 80 columns. Since f1 file has only 50 columns, so remaining 30 columns will be filled NAN values for the f1 file data. Same is true for other CSV files. Pandas dataframe gives me the all 80 columns perfectly:

import pandas as pd
import glob
df = pd.concat(map(pd.read_csv, ['Data/f1.csv','Data/f2.csv','Data/f3.csv','Data/f4.csv','Data/f5.csv']))

But I can't do the same thing with PySpark. How can I read all columns of the above 5 CSV files into single spark dataframe?

3
  • Are these files related in some way? Do they have the same number of rows and the rows are all in order? Commented Feb 1, 2023 at 3:24
  • Each day in the Backblaze data center, we take a snapshot of each operational hard drive. This snapshot includes basic drive information along with the S.M.A.R.T. statistics reported by that drive. The daily snapshot of one drive is one record or row of data. All of the drive snapshots for a given day are collected into a file consisting of a row for each active hard drive. Commented Feb 1, 2023 at 3:38
  • backblaze.com/b2/hard-drive-test-data.html is the data source Commented Feb 1, 2023 at 3:38

3 Answers 3

2

You can read each file into its own Spark dataframe, to combine all dataframes into one dataframe, use union.

Fill the the missing columns in the dataframes with fewer columns.

Merge them using union or reduce.

from functools import reduce
from pyspark.sql.functions import lit, col

df_list = [spark.read.csv("f{}.csv".format(i), header=True) for i in range(1, 6)]

cols = [len(df.columns) for df in df_list]
max_cols = max(cols)

df_list = [df.select(*[col(c) for c in df.columns] + [lit(None).alias("col_{}".format(i+j)) for i in range(len(df.columns), max_cols)]) for j, df in enumerate(df_list)]

df_final = reduce(lambda x, y: x.union(y), df_list)

I reproduced your case on this github.

Sign up to request clarification or add additional context in comments.

7 Comments

How this line spark.read.csv("f{}.csv".format(i), header=True) for i in range(1, 6) works?
The file names are generated by "f{}.csv".format(i) where {} is a placeholder for the value of i which is supplied by the for loop. So for each iteration of the loop, i is incremented by 1 and the string "f{}.csv".format(i) is replaced by the actual value of i. Please accept the answer if that solves your problem.
How it will access the csv files from the list, say Files=['Data/f1.csv','Data/f2.csv','Data/f3.csv','Data/f4.csv','Data/f5.csv'] ?
I tried this, df_list=[spark.read.csv(List5Files,header=True)] but it didn't work
Instead of f{}.csv, use Data/f{}.csv
|
1

It was a very easy fix. What I did,

Files=['Data/f1.csv','Data/f2.csv','Data/f3.csv','Data/f4.csv','Data/f5.csv']
Files.reverse()
df = spark.read.csv(Files,inferSchema=True, header=True)

Last files had all columns because columns were added incrementally. Reversing them solved the issues.

Comments

0

In more recent versions of Spark - currently 3.4.1, adding the option to mergeSchema effectively allows the later, wider dataframes to be fully integrated with the skinnier prior ones.

Files=['Data/f1.csv','Data/f2.csv','Data/f3.csv','Data/f4.csv','Data/f5.csv']

df = (
    spark.read
    .option("header", True)
    .option("inferSchema", True)
    .option("mergeSchema", True)
    .csv(Files)
)

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.