1

I tried adding a column which has row number using zipWithIndex as below in spark

val  df = sc.parallelize(Seq((1.0, 2.0), (0.0, -1.0), (3.0, 4.0), (6.0, -2.3))).toDF("x", "y")
val rddzip = df.rdd.zipWithIndex;
val newSchema = StructType(df.schema.fields ++ Array(StructField("rowid", LongType, false)))
val dfZippedWithId =  spark.createDataFrame(rddzip.map{ case (row, index) => Row.fromSeq(row.toSeq ++ Array(index))}, newSchema)

But Same thing I am trying to do in JAVA as below

JavaRDD<Row> rdd = (JavaRDD) df.toJavaRDD().zipWithIndex().map(t -> {
    Row r = t._1;
    Long index = t._2 + 1;
    ArrayList<Object> list = new ArrayList<>();
    for(Object item: JavaConverters.seqAsJavaListConverter(r.toSeq()).asJava()) {
        list.add(item);
    }
    return RowFactory.create(JavaConverters.seqAsJavaListConverter(t._1.toSeq()).asJava().add(t._2));
});
StructType newSchema = df.schema()
        .add(new StructField(name, DataTypes.LongType, true, Metadata.empty()));
return df.sparkSession().createDataFrame(rdd, newSchema);

I get this below error

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 23.0 failed 1 times, most recent failure: Lost task 0.0 in stage 23.0 (T
ID 523, localhost, executor driver): java.lang.UnsupportedOperationException
        at java.util.AbstractList.add(Unknown Source)
        at java.util.AbstractList.add(Unknown Source)
        at com.nielsen.media.mediaView.adintel.pivot.datareader.AIReportViewerProcessor.lambda$zipWithIndex$7245ab51$1(AIReportViewerProcessor.java:3071)
        at org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1.apply(JavaPairRDD.scala:1040)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
        at org.apache.spark.scheduler.Task.run(Task.scala:121)
        at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
        at java.lang.Thread.run(Unknown Source)

Driver stacktrace:
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1877)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1876)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)

Any help?

1

1 Answer 1

1

in scala version you are passing to spark.createDataFrame RDD[Row] and in java you are passing JavaPairRDD, you should map it to JavaRDD[Row].

        Dataset<Row> df = ss.range(10).toDF();
        df.show();

        JavaPairRDD<Row, Long> rddzip = df.toJavaRDD().zipWithIndex();
        JavaRDD<Row> rdd = rddzip.map(s->{
            Row r = s._1;
            Object[] arr = new Object[r.size()+1];
            for (int i = 0; i < arr.length-1; i++) {
                arr[i] = r.get(i);
            }
            arr[arr.length-1] = s._2;
            return RowFactory.create(arr);
        });

        StructType newSchema = df.schema().add(new StructField("rowid",
                DataTypes.LongType, false, Metadata.empty()));

        Dataset<Row> df2 = ss.createDataFrame(rdd,newSchema);

        df2.show();
    +---+
    | id|
    +---+
    |  0|
    |  1|
    |  2|
    |  3|
    |  4|
    |  5|
    |  6|
    |  7|
    |  8|
    |  9|
    +---+

    +---+-----+
    | id|rowid|
    +---+-----+
    |  0|    0|
    |  1|    1|
    |  2|    2|
    |  3|    3|
    |  4|    4|
    |  5|    5|
    |  6|    6|
    |  7|    7|
    |  8|    8|
    |  9|    9|
    +---+-----+


Sign up to request clarification or add additional context in comments.

1 Comment

Yeah the Mapping I am not able to code the corresponding in java

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.