4

I am new to Pyspark and trying to use spark.read method to read S3 files in dataframe. I was able to successfully read one file from S3. Now I need to iterate and read all the files in a bucket.

My question is how to iterate and get all the files one by one.

I used to do this in in Python using boto3, is there something similar in Pyspark. s3_client.list_objects

1
  • Depending on permissions and where you are running pyspark, you can install boto3. Otherwise, you might want to look into wildcarding. Something like spark.read.parquet('s3://my_bucket/*/*/*') Commented May 8, 2023 at 15:28

4 Answers 4

6

What if you use the SparkSession and SparkContext to read the files at once and then loop through thes s3 directory by using wholeTextFiles method. You can utilize the s3a connector in the url which allows to read from s3 through Hadoop.

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('S3Example').getOrCreate()

s3_bucket = 'your-bucket'
s3_path = f's3a://{s3_bucket}/my-directory/'

# List files S3
file_list = spark.sparkContext.wholeTextFiles(s3_path).map(lambda x: x[0]).collect()

for file_path in file_list:
    print(file_path)

Please note, above I've only retrieved the file paths. If you want both, you can avoid only extracting the file path (x[0] in the lambda), and get both.

file_tuple = spark.sparkContext.wholeTextFiles(s3_path)
Sign up to request clarification or add additional context in comments.

2 Comments

Is there a way to limit the number of files read/listed? I have a huge directory and the top 10 would be sufficient
Also FWIW I don't find the s3a connector necessary with PySpark 3.5.0, plain s3 works fine
2

You can use s3fs - pip install s3fs

And can try below code -

import s3fs

fs = s3fs.S3FileSystem(anon=True)
fs.ls('my-bucket')

Also see this doc.

Another way is using hadoopFile:

SparkContext.hadoopFile("s3a://bucket_name/prefix)

refer to this doc.

2 Comments

I have already authenticated S3 connection in my spark session. If I use s3fs method, I will need one more connection to S3 just to list files and then spark session to read files. This is why I am trying to figure out if there is a way to list files using same spark session.
In that case you can use SparkContext.hadoopFile("s3a://bucket_name/prefix)
0

In addition to all the other answers;

You can still use boto3 lib to get the list of objects with full path like below:

response = boto3.client('s3').list_objects_v2(Bucket=bucket, Prefix=prefix)
full_paths_list = list()
for obj in response["Contents"]:
    full_path = f's3://full_paths_list bucket}/{obj["Key"]}'
    full_paths_list.append(full_path)

Now, you have files list so you can read them with pyspark like below:

df = spark.read.parquet(*full_paths_list) 

Comments

-1

You can use boto3 with pyspark. List objects in an S3 bucket and read them into a PySpark DataFrame.

  1. Iterate over the list of objects and read each file into a PySpark DataFrame using the spark.read method.

  2. Append each DataFrame to a list and then union all dataframes into one using the reduce function.

import boto3
import pyspark.sql.functions as F
from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder.appName("Read from S3").getOrCreate()

# Set up a boto3 client
s3 = boto3.client('s3')

# List all objects in the S3 bucket
bucket_name = "your-bucket-name"
prefix = "path-to-folder-within-bucket/"  # optional: if you want to read only files in a particular folder
objects = s3.list_objects_v2(Bucket=bucket_name, Prefix=prefix)

# Read each file into a PySpark DataFrame
dataframes = []
for obj in objects['Contents']:
    file_name = obj['Key']
    s3_path = f's3://{bucket_name}/{file_name}'
    df = spark.read.option("header", True).csv(s3_path)
    dataframes.append(df)

# Union all dataframes into one
final_df = reduce(lambda a, b: a.union(b), dataframes)

# Show the final dataframe
final_df.show()

4 Comments

Thanks Nipun. One basic question since I am just starting, I see that in boto 3 client initiation you are not using AWS Secret etc. Does this mean you are suggesting store them in a common config file and it will be automatically used from that config in both boto3 and Spark session ?
never use inferSchema reading CSV files off an object store unless you want to force spark to read every file during query planning.
@PythonDeveloper you can add it to common config file but then you need to add s3 secret in the boto3.client and reference the secret from environment file.
@stevel Yes you are correct. I've updated my answer based on your input.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.