I've looked around and couldn't find solutions to my problem. Maybe I'm doing something wrong? I've tried adding Jars mentioned in other posts. I setup the s3 access key and secret key. I'm not sure if it has to do with what s3 I address? Maybe not s3a,s3n,s3? I feel like it's a depenency problem so I focused on it but also attemped various combination of addressing s3. If anyone knows a better way to checking what s3 version is appropriate please comment down below. Thannk you in advanced.
from pyspark import SparkConf, SparkContext, SQLContext
from pyspark.sql import SparkSession
When I try this it reads the file:
# s3 = boto3.resource("s3")
# bucket = s3.Bucket("BUCKET")
# obj = bucket.Object(key="All_the_Jokes.txt")
# response = obj.get()
#
# text = response["Body"].read()
# print(text)
When I try using spark I run into many issues:
conf = (SparkConf()
.setAppName("S3 Configuration Test")
.set("spark.executor.instances", "1")
.set("spark.executor.cores", 1)
.set("spark.executor.memory", "2g"))
# Settting up the configuration to access Amazon S3
sc = SparkContext(conf=conf)
sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", "KEY")
sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", "KEY")
# sc._jsc.hadoopConfiguration().set("fs.s3.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
# sc._jsc.hadoopConfiguration().set("fs.s3n.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
# Set log level to off for faster performance of the terminal
# sc.setLogLevel("OFF")
# Sql context to run queries
sqlContext = SQLContext(sc)
# Spark Session to create dataframes and use the read.Json method after proper configuration of Amazon S3
ss = SparkSession(sc)
file = sc.textFile("s3n://bucket/All_the_Jokes.txt")
file.count()
Extra info:
Extra Jars in spark:
aws-java-sdk-1.7.4.jar
hadoop-aws-2.7.6.jar
Hadoop 2.8.5
Python 2.7.17/Python 3.6.9
Error Result:
Traceback (most recent call last):
File "/home/desktop/PycharmProjects/emr/emr_jbo.py", line 33, in <module>
file.count()
File "/home/desktop/PycharmProjects/emr/venv/lib/python3.6/site-packages/pyspark/rdd.py", line 1055, in count
return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
File "/home/desktop/PycharmProjects/emr/venv/lib/python3.6/site-packages/pyspark/rdd.py", line 1046, in sum
return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add)
File "/home/desktop/PycharmProjects/emr/venv/lib/python3.6/site-packages/pyspark/rdd.py", line 917, in fold
vals = self.mapPartitions(func).collect()
File "/home/desktop/PycharmProjects/emr/venv/lib/python3.6/site-packages/pyspark/rdd.py", line 816, in collect
sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
File "/home/desktop/PycharmProjects/emr/venv/lib/python3.6/site-packages/py4j/java_gateway.py", line 1257, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "/home/desktop/PycharmProjects/emr/venv/lib/python3.6/site-packages/pyspark/sql/utils.py", line 63, in deco
return f(*a, **kw)
File "/home/desktop/PycharmProjects/emr/venv/lib/python3.6/site-packages/py4j/protocol.py", line 328, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: java.io.IOException: No FileSystem for scheme: s3n
at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2660)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2667)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2703)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2685)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:373)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)
at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:258)
at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:229)
at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:315)
at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:204)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:273)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:269)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:269)
at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:273)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:269)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:269)
at org.apache.spark.api.python.PythonRDD.getPartitions(PythonRDD.scala:55)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:273)
at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:269)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.rdd.RDD.partitions(RDD.scala:269)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:990)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:385)
at org.apache.spark.rdd.RDD.collect(RDD.scala:989)
at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:166)
at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)