3

I am using pyspark, and got the result rdd from the following code:

import numpy
model = PrefixSpan.train(input_rdd,minSupport=0.1)
result = model.freqSequences().filter(lambda x: (x.freq >= 50)).filter(lambda x: (len(x.sequence) >=2) ).cache()

The input_rdd looks fine when I checked with input_rdd.take(5). The above code created an rdd called 'result', which is of the following format:

PythonRDD[103] at RDD at PythonRDD.scala:48

I did have numpy installed, but when I tried to do result.take(5) or result.count(), I kept getting the following error below.

Py4JJavaErrorTraceback (most recent call last)
<ipython-input-32-7e589dce550c> in <module>()
----> 1 result.take(5)

/usr/local/spark-latest/python/pyspark/rdd.py in take(self, num)
   1308 
   1309             p = range(partsScanned, min(partsScanned + numPartsToTry, totalParts))
-> 1310             res = self.context.runJob(self, takeUpToNumLeft, p)
   1311 
   1312             items += res

/usr/local/spark-latest/python/pyspark/context.py in runJob(self, rdd, partitionFunc, partitions, allowLocal)
    939         # SparkContext#runJob.
    940         mappedRDD = rdd.mapPartitions(partitionFunc)
--> 941         port = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions)
    942         return list(_load_from_socket(port, mappedRDD._jrdd_deserializer))
    943 

/usr/local/spark-latest/python/lib/py4j-0.10.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
    931         answer = self.gateway_client.send_command(command)
    932         return_value = get_return_value(
--> 933             answer, self.gateway_client, self.target_id, self.name)
    934 
    935         for temp_arg in temp_args:

/usr/local/spark-latest/python/pyspark/sql/utils.py in deco(*a, **kw)
     61     def deco(*a, **kw):
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:
     65             s = e.java_exception.toString()

/usr/local/spark-latest/python/lib/py4j-0.10.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    310                 raise Py4JJavaError(
    311                     "An error occurred while calling {0}{1}{2}.\n".
--> 312                     format(target_id, ".", name), value)
    313             else:
    314                 raise Py4JError(

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 42.0 failed 4 times, most recent failure: Lost task 0.3 in stage 42.0 (TID 85, ph-hdp-abc-dn07): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/data/0/yarn/nm/usercache/abc-test/appcache/application_1482412711394_0011/container_e16_1482412711394_0011_01_000002/pyspark.zip/pyspark/worker.py", line 161, in main
    func, profiler, deserializer, serializer = read_command(pickleSer, infile)
  File "/data/0/yarn/nm/usercache/abc-test/appcache/application_1482412711394_0011/container_e16_1482412711394_0011_01_000002/pyspark.zip/pyspark/worker.py", line 54, in read_command
    command = serializer._read_with_length(file)
  File "/data/0/yarn/nm/usercache/abc-test/appcache/application_1482412711394_0011/container_e16_1482412711394_0011_01_000002/pyspark.zip/pyspark/serializers.py", line 164, in _read_with_length
    return self.loads(obj)
  File "/data/0/yarn/nm/usercache/abc-test/appcache/application_1482412711394_0011/container_e16_1482412711394_0011_01_000002/pyspark.zip/pyspark/serializers.py", line 422, in loads
    return pickle.loads(obj)
  File "/data/0/yarn/nm/usercache/abc-test/appcache/application_1482412711394_0011/container_e16_1482412711394_0011_01_000002/pyspark.zip/pyspark/mllib/__init__.py", line 28, in <module>
ImportError: No module named numpy

    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:193)
    at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:234)
    at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:152)
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:332)
    at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:330)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:935)
    at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:910)
    at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:866)
    at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:910)
    at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:668)
    at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:281)
    at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:63)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
    at org.apache.spark.scheduler.Task.run(Task.scala:85)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

Does anyone know what I missed? Thanks!

3
  • Can you show how you arrived at the RDD. The error is being caused beforehand thus it can not show you elements of the RDD when you call on it. Commented Dec 23, 2016 at 19:47
  • @DmitryPolonskiy: I have modified the question to include how the result rdd generated. Please advice. Thanks! Commented Dec 23, 2016 at 21:06
  • What @user said might be valid, but also the fact that you're getting an error when trying to collect result could mean there is an error with the code you are using to arrive at result. Commented Dec 24, 2016 at 15:35

2 Answers 2

3

If driver side import doesn't fail if means that numpy is not accessible in the executor interpreters. This can arise in a few cases:

  • you've installed numpy only on the driver node (numpy is missing on the worker nodes).
  • you've installed numpy on the worker nodes but workers are incorrectly configured:

    • numpy is installed but missing from the interpreter path.
    • numpy is installed but workers use different environment / interpreter than a one with installed numpy.
Sign up to request clarification or add additional context in comments.

Comments

2

I replicated your code and it appears @user7337271 is correct. This specific module needs numpy to work as is seen in the first several lines of the source code. Here is my code working to validate that it is in fact an issue with numpy possibly only being installed on your master node.

import numpy
from pyspark.mllib.fpm import PrefixSpan

data = [[["a", "b"], ["c"]],[["a"], ["c", "b"], ["a", "b"]],[["a", "b"], ["e"]],[["f"]]]
rdd = sc.parallelize(data)
model = PrefixSpan.train(rdd, minSupport=0.1)
result = model.freqSequences().filter(lambda x: (x.freq >= 2)).filter(lambda x: (len(x.sequence) >=2) ).cache()
result.collect()

[FreqSequence(sequence=[[u'a'], [u'c']], freq=2)]

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.