0

Can you help me what is the error in this code, the file does exist but I know you are looking for it in HDFS sc.textFile("/user/spark/archivo.csv")

Or why does this error occur?

execution

export PYSPARK_PYTHON=python3
export PYSPARK_DRIVER_PYTHON=python3
spark-submit --queue=OID Proceso_Match1.py

Python

import os
import sys

from pyspark.sql import HiveContext, Row
from pyspark import SparkContext, SparkConf
from pyspark.sql.functions import *
from pyspark.sql import functions as F
from pyspark.sql.types import *

if __name__ =='__main__':
    conf=SparkConf().setAppName("Spark RDD").set("spark.speculation","true")
    sc=SparkContext(conf=conf)
    sc.setLogLevel("OFF")
    sqlContext = HiveContext(sc)

    #rddCentral = sc.textFile("hdfs:///user/spark/archivo.csv")
    rddCentral = sc.textFile("/user/spark/archivo.csv")

    rddCentralMap = rddCentral.map(lambda line : line.split(","))

    print('paso 1')
    dfCentral = sqlContext.createDataFrame(rddCentralMap, ["ROWID_CDR","DURACION","FECHA_LLAMADA","FECHA_LLAMADA_2","MATCH"])
    dfCentral=dfCentral.withColumn("FECHA_LLAMADA_NUM",dfCentral.FECHA_LLAMADA_2.cast(IntegerType()))
    dfCentral=dfCentral.withColumn("DURACION_NUM",dfCentral.DURACION.cast(IntegerType()))
    dfCentral=dfCentral.withColumn("MATCH_NUM",dfCentral.MATCH.cast(IntegerType()))

    sc.stop()

Error log

22/09/30 12:49:14 INFO cluster.YarnClientSchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.8
paso 1
/usr/local/bin/python3/lib/python3.7/site-packages/pandas/compat/__init__.py:124: UserWarning: Could not import the lzma module. Your installed Python is incomplete. Attempting to use lzma compression will result in a RuntimeError.
  warnings.warn(msg)
Traceback (most recent call last):
  File "/home/aic_proceso_vfs/rjaimea/vfs_504/bin/Proceso_Match1.py", line 21, in <module>
    dfCentral = sqlContext.createDataFrame(rddCentralMap, ["ROWID_CDR","DURACION","FECHA_LLAMADA","FECHA_LLAMADA_2","MATCH"])
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, cl-hdp-cdp-dn7.cse-cph.int, executor 1): java.io.IOException: Cannot run program "python3": error=2, No such file or directory
        at java.lang.ProcessBuilder.start(ProcessBuilder.java:1048)
Caused by: java.io.IOException: error=2, No such file or directory
        ... 16 more

File HDFS

hdfs dfs -ls /user/spark

Found 3 items
drwxr-xr-x   - spark       hdfs           0 2022-07-25 10:11 /user/spark/.sparkStaging
-rw-------   3 hadoopadmin hdfs          21 2022-09-30 12:25 /user/spark/archivo.csv
drwxrwxrwt   - spark       spark          0 2022-09-30 12:33 /user/spark/driverLogs

enter image description here

1
  • Your error says python3 executable doesn't exist on the Spark/YARN server, not that your file doesn't exist Commented Sep 30, 2022 at 20:41

1 Answer 1

1

I'm not sure but it seems like you're mis-using the schema for the database being created

the line dfCentral = sqlContext.createDataFrame(rddCentralMap, ["ROWID_CDR","DURACION","FECHA_LLAMADA","FECHA_LLAMADA_2","MATCH"])

takes the dict-like object that represents your dataframes data as the first parameter, and the schema for the second. You've just given a list of strings as the second argument.

In order to create the dataframe with some schema, you have to structure the fields that are in that list, then structure the list

so your program would look like this

import os
import sys

from pyspark.sql import HiveContext, Row
from pyspark import SparkContext, SparkConf
from pyspark.sql.functions import *
from pyspark.sql import functions as F
from pyspark.sql.types import * #where structType and structField come from

if __name__ =='__main__':
    conf=SparkConf().setAppName("Spark RDD").set("spark.speculation","true")
    sc=SparkContext(conf=conf)
    sc.setLogLevel("OFF")
    sqlContext = HiveContext(sc)

    #rddCentral = sc.textFile("hdfs:///user/spark/archivo.csv")
    rddCentral = sc.textFile("/user/spark/archivo.csv")

    rddCentralMap = rddCentral.map(lambda line : line.split(","))

    print('paso 1')

    dfFields = ["ROWID_CDR","DURACION","FECHA_LLAMADA","FECHA_LLAMADA_2","MATCH"]
    dfSchema = StructType([StructField(field_name, StringType(), True) for field_name in dfFields])
    dfCentral = sqlContext.createDataFrame(rddCentralMap, dfSchema)
    dfCentral=dfCentral.withColumn("FECHA_LLAMADA_NUM",dfCentral.FECHA_LLAMADA_2.cast(IntegerType()))
    dfCentral=dfCentral.withColumn("DURACION_NUM",dfCentral.DURACION.cast(IntegerType()))
    dfCentral=dfCentral.withColumn("MATCH_NUM",dfCentral.MATCH.cast(IntegerType()))

    sc.stop()

Alternatively, the createDataFrame function takes an RDD as the first parameter. mapping the RDD created from reading the file may also be contributing to your issue

Sign up to request clarification or add additional context in comments.

1 Comment

This isn't related to the error in the question. Also, you can use spark.read.format("csv").load("file.csv") rather than use RDDs

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.