1

I have a matrix and number of columns and rows is unknow

One example Matrix is:

[5,1.3]
[1,5.2]

I want to convert it to DataFrame,column name is random,how to achive it? this is my expect result:

    +-------------+----+
    |         _1  | _2 |
    +-------------+----+
    |5            |1.3 |
    |1            |5.2 |
    --------------------

2 Answers 2

1

I suggest you convert matrix to RDD and then convert RDD to DataFrame, it is not a good way but works fine in Spark 2.0.0.

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.mllib.linalg._
import org.apache.spark.rdd.RDD
object mat2df {
    def main(args: Array[String]): Unit = {
         val conf = new SparkConf().setAppName("mat2df").setMaster("local[1]")
         val sc = new SparkContext(conf)
         val values = Array(5, 1, 1.3, 5.2)
         val mat = Matrices.dense(2, 2, values).asInstanceOf[DenseMatrix]
         def toRDD(m: Matrix): RDD[Vector] = {
             val columns = m.toArray.grouped(m.numRows)
             val rows = columns.toSeq.transpose 
             val vectors = rows.map(row => new DenseVector(row.toArray))
             sc.parallelize(vectors)
         }
         val mat_rows = toRDD(mat)// matrix to rdd
         val mat_rdd = mat_rows.map(_.toArray).map{case Array(p0, p1) => (p0, p1)}
         val spark: SparkSession = SparkSession.builder.master("local").getOrCreate
         val df = spark.createDataFrame(mat_rdd) // rdd to dataframe
         df.show()
    }
}
Sign up to request clarification or add additional context in comments.

Comments

1
def matrixToDataFrame(sc:SparkContext, matrix:Matrix, m_nodeColName:String):DataFrame={
val rdd = sc.parallelize(matrix.colIter.toSeq).map(x => {
      Row.fromSeq(x.toArray.toSeq)
    })
    val sc = new SQLContext(nodeContext.getSparkCtx())
    var schema = new StructType()

    val ids = ArrayBuffer[String]()
    for (i <- 0 until matrix.rowIter.size) {
      schema = schema.add(StructField(m_nodeColName +"_"+ i.toString(), DoubleType, true))
      ids.append(m_nodeColName +"_"+ i.toString())
    }

    sc.sparkSession.createDataFrame(rdd, schema)
}

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.