1

I start with the Spark philosophy and, in my case, Pyspark.

I have a small school project to do, which does not seem difficult, but I've been working on it for many days and I still cannot succeed.

I have to load images into a folder and extract the descriptorsto make a dimensional reduction.

I created a Pyspark dataframe with the image paths, and now I would like to add a column with the descriptors.

Here's how I did it.

List of image paths :

    lst_path = []

    sub_folders = os.listdir(folder)

    print(sub_folders)
    for f in sub_folders[:1]:

        lst_categ = os.listdir(folder + f)

        for file in lst_categ:

            lst_path.append(folder + f + "/" + file)

    print("Nombre d'images chargées :", len(lst_path))

    rdd = sc.parallelize(lst_path)
    row_rdd = rdd.map(lambda x: Row(x))
    df = spark.createDataFrame(row_rdd, ["path_img"])

Function to extract descriptors :

def get_desc(img):

    img = cv2.imread(file)
    orb = cv2.ORB_create(nfeatures=50)
    keypoints_orb, desc = orb.detectAndCompute(img, None)

    desc = desc.flatten()

    return desc

Function UDF:

udf_image = udf(lambda img: get_desc(img), ArrayType(FloatType()))

Creation of the new column:

df2 = df.withColumn("img_vectorized", udf_image("path_img"))

The result with printSchema():

root
|-- path_img: string (nullable = true)
|-- img_vectorized:array (nullable = true)
| |-- element: float (containsNull = true)

And when I do df2.show(), I've got the following error message :

Py4JJavaError: An error occurred while calling o773.showString. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 18.0 failed 1 times, most recent failure: Lost task 0.0 in stage 18.0 (TID 93, localhost, executor driver): net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for numpy.core.multiarray._reconstruct)

AttributeError: 'NoneType' object has no attribute 'flatten'

I note that the descriptors are null. I specify, when I do this extraction on a single line it works.

I don't understand why it doesn't work on my dataframe. Can you help me please?

Thanks.

1 Answer 1

0

I found the solution last night after many days of research ...

My corrected code :

def get_desc(img):

    image = cv2.imread(img)
    orb = cv2.ORB_create(nfeatures=50)
    keypoints_orb, desc = orb.detectAndCompute(image, None)

    if desc is None:

        desc = 0
    else:
        desc = desc.flatten().tolist()

    return desc

udf_image = udf(get_desc, ArrayType(IntegerType()))

df_desc = df.withColumn("descriptors", udf_image("path_img"))

df_desc = df_desc.filter(df_desc.descriptors. isNotNull())

df_desc.show()
+--------------------+--------------------+ 
|            path_img|         descriptors|
+--------------------+--------------------+ 
|Training/Apple-Br...|[69, 113, 253, 10...| 
|Training/Apple-Br...|[212, 236, 159, 2...|
|Training/Apple-Br...|[60, 53, 123, 239...|
|Training/Apple-Br...|[255, 189, 252, 1...|
|Training/Apple-Br...|[204, 244, 149, 1...|
+--------------------+--------------------+
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.