3

Using Spark 2.2.0 on OS X High Sierra. I'm running a Spark Streaming application to read a local file:

val lines = ssc.textFileStream("file:///Users/userName/Documents/Notes/MoreNotes/sampleFile")
    lines.print()

This gives me

org.apache.spark.streaming.dstream.FileInputDStream logWarning - Error finding new files
java.lang.NullPointerException
    at scala.collection.mutable.ArrayOps$ofRef$.length$extension(ArrayOps.scala:192)

The file exists, and I am able to read it using SparkContext (sc) from spark-shell on the terminal. For some reason going through the Intellij application and Spark Streaming is not working. Any ideas appreciated!

1
  • how are you running it ? You should bundle jar and then run from shell or from intellij, you should remote deploy Commented Mar 14, 2018 at 19:47

2 Answers 2

2

Quoting the doc comments of textFileStream:

Create an input stream that monitors a Hadoop-compatible filesystem for new files and reads them as text files (using key as LongWritable, value as Text and input format as TextInputFormat). Files must be written to the monitored directory by "moving" them from another location within the same file system. File names starting with . are ignored.

@param directory HDFS directory to monitor for new file

So, the method expects the path to a directory in the parameter.

So I believe this should avoid that error:

ssc.textFileStream("file:///Users/userName/Documents/Notes/MoreNotes/")
Sign up to request clarification or add additional context in comments.

2 Comments

The NullPointer error is gone but it isn't picking up the file or printing the contents (with lines.print()). it just gives this: ------------------------------------------- Time: 1521058915000 ms ------------------------------------------- ------------------------------------------- Time: 1521058920000 ms -------------------------------------------
Turns out it only picks up new files that are landed, not even files that are moved or renamed.
0

Spark streaming will not read old files, so first run the spark-submit command and then create the local file in the specified directory. Make sure in the spark-submit command, you give only directory name and not the file name. Below is a sample command. Here, I am passing the directory name through the spark command as my first parameter. You can specify this path in your Scala program as well.

spark-submit --class com.spark.streaming.streamingexample.HdfsWordCount --jars /home/cloudera/pramod/kafka_2.12-1.0.1/libs/kafka-clients-1.0.1.jar--master local[4] /home/cloudera/pramod/streamingexample-0.0.1-SNAPSHOT.jar /pramod/hdfswordcount.txt

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.