2

I have a dataframe df like the following

+--------+--------------------+--------+------+
|      id|                path|somestff| hash1|
+--------+--------------------+--------+------+
|       1|/file/dirA/fileA.txt|      58| 65161|
|       2|/file/dirB/fileB.txt|      52| 65913|
|       3|/file/dirC/fileC.txt|      99|131073|
|       4|/file/dirF/fileD.txt|      46|196233|
+--------+--------------------+--------+------+

One note: The /file/dir differ. Not all files are stored in the same directory. In fact there a hundreds of files in various directories.

What I want to accomplish here is to read the file in the column path and count the records within the files and write the result of the row count into a new column of a dataframe.

I tried the following function and udf:

def executeRowCount(fileCount: String): Long = {
  val rowCount = spark.read.format("csv").option("header", "false").load(fileCount).count
  rowCount
}

val execUdf = udf(executeRowCount _)

df.withColumn("row_count", execUdf (col("path"))).show()

This results in the following error

org.apache.spark.SparkException: Failed to execute user defined fu
nction($anonfun$1: (string) => bigint)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$10$$anon$1.hasNext(WholeStageCodegenExec.scala:614)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:253)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
        at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:830)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
        at org.apache.spark.scheduler.Task.run(Task.scala:109)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException
        at $line39.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:28)
        at $line39.$read$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$iw$$anonfun$1.apply(<console>:25)
        ... 19 more

I tried to iterate through the column when collected like

val te = df.select("path").as[String].collect()
te.foreach(executeRowCount)

and here it works just fine, but I want to store the result within the df...

I've tried several solutions, but I'm facing a dead end here.

0

4 Answers 4

2

That does not work as the data frames can only be created in the driver JVM but the UDF code is run in executor JVMs. What you can do is to load the CSVs into a separate data frame and enrich the data with a file name column:

val csvs = spark
 .read
 .format("csv")
 .load("/file/dir/")
 .withColumn("filename", input_file_name())

and then join the original df on filename column

Sign up to request clarification or add additional context in comments.

4 Comments

Hey ollik1, The /file/dir differ. Not all files are stored in the same directory. In fact there a hundreds of files in various directories. I cannot load everything from hdfs in one step. files differ in size from some hundret mbs to some gbs..
@datanin It is possible to define multiple locations stackoverflow.com/questions/24029873/… . The file loading is lazy so it seems at least worth trying if it works well
a dataframe is a distributed collection of data, so I don't understand why I cannot use the UDF on the df. Nevertheless the workaround with a filename column is a good idea and I'll try that one. If it works, this will be my solution. Thanks
Hi ollik1, now I understand the problem with the driver/worker. Its not the dataframe but the UDF which is not serializable.
1

I fixed this issue in the following way:

val queue = df.select("path").as[String].collect()
val countResult = for (item <- queue) yield {
    val rowCount = (item, spark.read.format("csv").option("header", "false").load(item).count)
    rowCount
}

val df2 = spark.createDataFrame(countResult)

Afterwards I joined the df with df2...

The problem here is as @ollik1 mentioned within the driver/worker architecture on udfs. The UDF is not serializable, what I would need with the spark.read function.

Comments

0

What about ? :

def executeRowCount = udf((fileCount: String) => {
  spark.read.format("csv").option("header", "false").load(fileCount).count
})

df.withColumn("row_count", executeRowCount(col("path"))).show()

Comments

0

May be something like that ?

  sqlContext
    .read
    .format("csv")
    .load("/tmp/input/")
    .withColumn("filename", input_file_name())
    .groupBy("filename")
    .agg(count("filename").as("record_count"))
    .show()

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.