1

I've looked around and couldn't find solutions to my problem. Maybe I'm doing something wrong? I've tried adding Jars mentioned in other posts. I setup the s3 access key and secret key. I'm not sure if it has to do with what s3 I address? Maybe not s3a,s3n,s3? I feel like it's a depenency problem so I focused on it but also attemped various combination of addressing s3. If anyone knows a better way to checking what s3 version is appropriate please comment down below. Thannk you in advanced.

from pyspark import SparkConf, SparkContext, SQLContext

from pyspark.sql import SparkSession

When I try this it reads the file:

# s3 = boto3.resource("s3")
# bucket = s3.Bucket("BUCKET")
# obj = bucket.Object(key="All_the_Jokes.txt")
# response = obj.get()
#
# text = response["Body"].read()
# print(text)

When I try using spark I run into many issues:

conf = (SparkConf()
        .setAppName("S3 Configuration Test")
        .set("spark.executor.instances", "1")
        .set("spark.executor.cores", 1)
        .set("spark.executor.memory", "2g"))
# Settting up the configuration to access Amazon S3
sc = SparkContext(conf=conf)
sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", "KEY")
sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", "KEY")
# sc._jsc.hadoopConfiguration().set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
# sc._jsc.hadoopConfiguration().set("fs.s3n.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")

# Set log level to off for faster performance of the terminal
# sc.setLogLevel("OFF")
# Sql context to run queries
sqlContext = SQLContext(sc)
# Spark Session to create dataframes and use the read.Json method after proper configuration of Amazon S3
ss = SparkSession(sc)

file = sc.textFile("s3n://bucket/All_the_Jokes.txt")
file.count()

Extra info:

Extra Jars in spark:
aws-java-sdk-1.7.4.jar
hadoop-aws-2.7.6.jar
Hadoop 2.8.5
Python 2.7.17/Python 3.6.9


Error Result:
Traceback (most recent call last):
  File "/home/desktop/PycharmProjects/emr/emr_jbo.py", line 33, in <module>
    file.count()
  File "/home/desktop/PycharmProjects/emr/venv/lib/python3.6/site-packages/pyspark/rdd.py", line 1055, in count
    return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
  File "/home/desktop/PycharmProjects/emr/venv/lib/python3.6/site-packages/pyspark/rdd.py", line 1046, in sum
    return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add)
  File "/home/desktop/PycharmProjects/emr/venv/lib/python3.6/site-packages/pyspark/rdd.py", line 917, in fold
    vals = self.mapPartitions(func).collect()
  File "/home/desktop/PycharmProjects/emr/venv/lib/python3.6/site-packages/pyspark/rdd.py", line 816, in collect
    sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
  File "/home/desktop/PycharmProjects/emr/venv/lib/python3.6/site-packages/py4j/java_gateway.py", line 1257, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/home/desktop/PycharmProjects/emr/venv/lib/python3.6/site-packages/pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/home/desktop/PycharmProjects/emr/venv/lib/python3.6/site-packages/py4j/protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: java.io.IOException: No FileSystem for scheme: s3n
    at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2660)
    at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2667)
    at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94)
    at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2703)
    at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2685)
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:373)
    at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)
    at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:258)
    at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:229)
    at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:315)
    at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:204)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:273)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:269)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:269)
    at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:273)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:269)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:269)
    at org.apache.spark.api.python.PythonRDD.getPartitions(PythonRDD.scala:55)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:273)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:269)
    at scala.Option.getOrElse(Option.scala:121)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:269)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
    at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:990)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)
    at org.apache.spark.rdd.RDD.collect(RDD.scala:989)
    at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:166)
    at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)

2 Answers 2

2

after some trial and error and readying these are the steps I found to make it work.

1) Set up s3 native file system using these commands:

sc._jsc.hadoopConfiguration().set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
sc._jsc.hadoopConfiguration().set("fs.s3n.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")

2) Set up environment variables with this command:

import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.amazonaws:aws-java-sdk-pom:1.10.34,org.apache.hadoop:hadoop-aws:2.7.2 pyspark-shell

'

3)Set the s3 url as follows: "s3n://ACCESSKEY:SECRETKEY@BUCKET/FILE.txt"

4) Add jar files to spark directory (Unsure if necessary): aws-java-sdk-1.7.4.jar hadoop-aws-2.7.6.jar

These were the necessary steps I tool to ensure I can use the spark framework to read directly from S3.These are the tools and versions I used: Hadoop 2.8.5 Python 2.7.17/Python 3.6.9. Hopefully, this is helpful to anyone trying to get this to work. Thank you all.

Sign up to request clarification or add additional context in comments.

Comments

0

Your properties seems incorrect

Instead of

sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", "KEY")
sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", "KEY")

Try

sc._jsc.hadoopConfiguration().set("fs.s3n.awsAccessKeyId", accessKeyId)
sc._jsc.hadoopConfiguration().set("fs.s3n.awsSecretAccessKey", secretAccessKey)

And provide correct access keys and secretAccessKey And use path starting with s3n and you should be able to connect.

1 Comment

Thank you for catching that error on my end. It did not solve my problem but I did manage to get it to work. I will answer my question thank you.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.