0

I am trying to transpose a huge dataframe (100Mx20K). As the dataframe is spread over multiple nodes and difficult to collect on the driver, I would like to do the transpose through conversion through mllib matrices. The idea seems to have been tested elsewhere, so the opted procedure was as follows:

import org.apache.spark.sql.functions._
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.linalg.distributed.{IndexedRow, IndexedRowMatrix}

val sqlContext = new org.apache.spark.sql.SQLContext(sc)

val df = sqlContext.read.parquet("temp/test.parquet").select("H1","H2","H3","H4")
val matrixColumns = df.columns

val rdd = df.select(array(matrixColumns:_*).as("arr")).as[Array[Int]].rdd
  .zipWithIndex()
  .map{ case(arr, index) => IndexedRow(index, Vectors.dense(arr.map(_.toDouble)))} 

val dm = new IndexedRowMatrix(rdd).toBlockMatrix().toLocalMatrix()

I noticed a possible type and tried substitution:

orig:
    val rdd = df.select(array(matrixColumns:_*).as("arr"))....

modified:
    val rdd = df.select(Array(matrixColumns:_*)).as("arr")...

However, neither works for me and the above change throws error:

scala> df.select(Array(matrixColumns:_*)).as("arr")
              ^
       error: overloaded method select with alternatives:
         [U1](c1: org.apache.spark.sql.TypedColumn[org.apache.spark.sql.Row,U1]): org.apache.spark.sql.Dataset[U1] <and>
         (col: String,cols: String*)org.apache.spark.sql.DataFrame <and>
         (cols: org.apache.spark.sql.Column*)org.apache.spark.sql.DataFrame
        cannot be applied to (Array[String])

I am unsure if there is a version issue (I am using Spark 3.3.0) or if the problem is elsewhere. I would be grateful for any help in fixing the above error.

1 Answer 1

1

Change the select invocation to:

df.select(matrixColumns.head, matrixColumns: _*)

or

import org.apache.spark.sql.functions.col

df.select(matrixColumns.map(col(_)):_*)
Sign up to request clarification or add additional context in comments.

4 Comments

While the modified approach worked, the performance was terrible and crashed after an hour even for a relatively smaller data. I have to find some other way of doing the operation. Among what I could succeed, for data that fits into RAM, Polars (in Python) seems to be the best solution.
That matrix is enormous, I suppose that your matrix is sparse, isn´t it?, Work with Sparse vectors in that case.
No, totally dense. 4x1M was the test data. I am trying now to collect columns one at a time, write a CSV (or something like that) and read again. Not sure if it will be able to load so many columns though.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.